当前位置: 首页 > article >正文

PDF书籍《手写调用链监控APM系统-Java版》第6章 链路的架构(Trace+TraceSegment+Span)

本人阅读了 Skywalking 的大部分核心代码,也了解了相关的文献,对此深有感悟,特此借助巨人的思想自己手动用JAVA语言实现了一个 “调用链监控APM” 系统。本书采用边讲解实现原理边编写代码的方式,看本书时一定要跟着敲代码。

作者已经将过程写成一部书籍,奈何没有钱发表,如果您知道渠道可以联系本人。一定重谢。

本书涉及到的核心技术与思想

JavaAgent , ByteBuddy,SPI服务,类加载器的命名空间,增强JDK类,kafka,插件思想,切面,链路栈等等。实际上远不止这么多,差不多贯通了整个java体系。

适用人群

自己公司要实现自己的调用链的;写架构的;深入java编程的;阅读Skywalking源码的;

版权

本书是作者呕心沥血亲自编写的代码,不经同意切勿拿出去商用,否则会追究其责任。

原版PDF+源码请见:

本章涉及到的工具类也在下面:

PDF书籍《手写调用链监控APM系统-Java版》第1章 开篇介绍-CSDN博客

第6章 链路的架构(Trace+TraceSegment+Span)

经过前面章节的洗礼,能坚持读到这里的已是难能可贵,也证明你离胜利已经不远了。

本章介绍的是在前面章节的切面环绕方法进行链路埋点, 将一系列方法请求调用的信息抽象成链路(Trace),然后发送到后端OAP。请求调用包含 “http调用”,“DB调用”,“MQ调用”,“Cache调用”,“RPC调用”,“本地方法调用”。复杂的微服务系统就是由以上6种调用组成错综复杂的链路。调用链监控系统就是采集N条链路然后进行监控。

一条链路我们又可以抽象出TraceSegment和Span的概念,下面我们来详细讲解下链路的知识。

5.1 链路的理论知识

5.1.1 Trace的介绍

Trace就是一条链路,是指一个请求或者一个操作从开始到结束的完整路径。Trace结束后会被立马发送到后端。

比如浏览器访问下单接口,首先请求到达网关,此时一条链路就开始了,会分配一个唯一taceId作为标识,直到这个下单接口的网关返回给浏览器了,这条trace便结束,然后被立马发送到kafka。

实际上我们后面编写代码是没有Trace这个类的,是一个抽象概念,只有traceId这个实际的字符串存在。

5.1.2 TraceSegment的介绍

一个Trace由很多的TraceSegment组成。TraceSegment是包含了JVM线程,或进程的一些列操作,也就是说,如果方法执行时开辟了新的线程,就会新生成一个TraceSegment记录调用信息, 如果时发起了rpc请求(跨进程)也会生成一个TraceSegment。

比如下单接口进入到了下单服务,下单服务首先调用本地的一个校验参数方法,由于没有跨线程或进程,到这里都只构建一个TraceSegment。当发起RPC调用查询库存服务库存是否充足时,此时就会产生一个新的TraceSegment,新的TraceSegment会持有一个TraceSegmentRef,指向前一个TraceSegment信息 ,这样就把调用串联起来了。

TraceSegment也有自己的id,同时也有traceId ,一条链路的所有TraceSegment里面的traceId都是相同的。后端分析时,只需要查询出traceId相同的TraceSegment集合,就可以分析一条链路了。

5.1.3 Span的介绍

TraceSegment下面还有Span,Span就是最小粒度了,代表一个具体的操作, 也就是上面提到过的“http调用”,“DB调用”,“MQ调用”,“Cache调用”,“RPC调用”,“本地方法调用”。

TraceSegment 就是由很多具体的span组成,在后面代码中,TraceSegment 类里面会有一个List集合,里面就是存储的当前所属的span。

Span里面包含很多关于调用的信息,都是上报到后端的信息。

第一个是operationName:当前的操作的名称,如果是http接口的话,这个就是请求的url地址。

第二个是component:当前是哪个组件,比如Tomcat,Mysql等。

第三个是tag信息:tag是一个map数组,记录一些额外信息,比如当前是HTTP请求,tag就会记录http.method=GET|POST 等。

第四个是spanLayer:layer是一个枚举,里面包含:DB(1), RPC_FRAMEWORK(2), HTTP(3), MQ(4), CACHE(5);

第五个是log:记录的是发生错误时的错误堆栈信息。

第六个是time:包含span的创建时间和结束时间,结束就代表发起了另一个span或者请求结束。

每个span也有自己的整型spanId,在一个TraceSegment里面的所有的span的ID都是自增长的,从0开始,也就是说第一个span的id为0 ,span里面也有parentSpanId用于指向前一个span的id。第一个span的parentSpanId为-1 。

通过对调用的总结与抽象,我们可以将Span分为三大类型:EntrySpan,LocalSpan,ExitSpan。

EntrySpan

代表一个请求入口的类型的span, 比如一个下单请求进入了下单服务的controller,首先请求会进入tomcat框架层,此时就会被tomcat的插桩插件拦截处理,创建EntrySpan ,并且设置好operationName,layer,tag等信息。

但是请求后面还会进入springmvc ,然后在到我们的controller接口, 进入springmvc插桩插件时不会创建新的EntrySpan , 而是复用这个tomcat层创建的EntrySpan ,但是会覆盖之前的operationName,layer,tag。 这种复用节约资源的操作是最合理的设计。所以如果是连着两个都是Entry Span的就会产生复用逻辑,且信息记录的是靠近后面的信息。

值得注意的是,如果是调用查询redis或者mysql时,就不是创建EntrySpan, 而是后面要说的ExitSpan 。

LocalSpan

这个就很简单了,代表的是一个本地方法的调用,注意不是native方法,就是普通方法的调用。

ExitSpan

是链路中一个退出的点或者离开的Span,可以简单理解为离开当前线程或进程的操作。

拿我们之前的下单接口的例子来讲,请求进到tomcat,然后进入springmvc,然后进入到我们的下单方法,下单方法里面又有一个redis的调用和mysql的调用。

当请求redis时,由于是离开了当前进程,会创建一个ExitSpan , 然后请求mysql,又会创建一个ExitSpan ,这个ExitSpan 和请求redis创建的两者之前没有任何关系。

如果下单接口里又通过feign调用了库存服务,这时会有ExitSpan 的复用逻辑 首先通过feign时,会被feign的插桩插件新建一个ExsitSpan,并且设置好operationName,layer,tag等信息。

然后feign通过httpclient发起http调用。此时httpclient的插桩插件会复用feign创建的ExsitSpan , 并且不会覆盖之前的operationName,layer,tag等信息,这个正好与EntrySpan相反。

写到这里,不知道读者能不能大概明白了链路的基本模型。可以反复几次读上面的理论知识,相信你一定能理解清楚,理解后,我们后面的编码会轻松很多。

5.2 链路TraceSegment,Span的编码实现

5.2.1 TraceSegment的实现

经过前面理论知识的洗礼,我们很容易写出TraceSegment的实现,在apm-agent-core 模块下,新建类:

com.hadluo.apm.agentcore.trace.TraceSegment

public class TraceSegment {
    // 指向上一个 segment
    @Setter
    private TraceSegmentRef ref ;
    // 当前 segment的 所有 span
    private List<AbstractSpan> spanList = new ArrayList<>();
    // segment的 id
    private String traceSegmentId ;
    // 一条跟踪链路的唯一id
    @Setter
    private String traceId ;
    // 创建时间
    private long createTime;
    public TraceSegment() {
// 工具类生成唯一id
        this.traceSegmentId = GlobalIdGenerator.generate();
        this.createTime = System.currentTimeMillis();
        this.traceId = GlobalIdGenerator.generate() ;
    }
    // 添加一个span 到 当前segment
    public void addSpan(AbstractSpan span) {
        this.spanList.add(span);
    }
}

TraceSegment有指向上一个的ref,还有装span的集合,以及traceId,traceSegmentId 等。这里traceId在构造函数中赋值了,不是说一条链路的traceId都一样吗? 这里我们的traceId 字段上面还有一个@Setter,这个是lombok注解,提供了set重新设置这个traceId 的方法,所以会保证traceId 都是同一个的。

GlobalIdGenerator为工具类,在apm-commons里面。

TraceSegmentRef 需要建立在apm-commons项目下,后面会公用到,在apm-commons项目新建类:

com.hadluo.apm.commons.trace.TraceSegmentRef

public class TraceSegmentRef {
    public enum SegmentRefType {
        /*是跨进程产生的 trace segment*/
        CROSS_PROCESS,
        /*跨线程产生的新 trace segment*/
        CROSS_THREAD
    }
    private SegmentRefType type;
    private String traceId;
    private String traceSegmentId;
    // trace segment里面最后一个span id
    private int spanId;
    // 当前的服务名称
    private String parentServiceName;
    // 实例名称
    private String parentServiceInstance ;

    public TraceSegmentRef(ContextCarrier carrier) {
        this.type = SegmentRefType.CROSS_PROCESS;
        this.traceId = carrier.getTraceId();
        this.traceSegmentId = carrier.getTraceSegmentId();
        this.spanId = carrier.getSpanId();
        this.parentServiceName = carrier.getParentServiceName();
        this.parentServiceInstance = carrier.getParentServiceInstance();
    }
}

这个TraceSegmentRef 就相当于TraceSegment。

之前我们讲到过,一个线程或进程里面操作就是一个TraceSegment, 如果产生一个TraceSegment必定就是跨线程或者进程操作,所以会有SegmentRefType 作为标识。其余的都是id的基本信息和服务名称等。

ContextCarrier 为跨线程或者进程传输的直接载体类,相当于一个bean对象。 当发生HTTP跨进程调用时,会把当前链路信息像traceId等,设置到http 的请求头里面,收到的服务就会解析生成ContextCarrier ,然后生成TraceSegmentRef ,就将TraceSegment进行了串联。

在apm-commons中新建类:

com.hadluo.apm.commons.trace.ContextCarrier

@Data
public class ContextCarrier {
    private String traceId;
    private String traceSegmentId;
    // 最后一个span id
    private int spanId;
    private String parentServiceName;
    private String parentServiceInstance;

    public boolean isEmpty() {
        return traceId == null || traceId.isEmpty();
    }
}

isEmpty 方法可以判断出上游是否有携带数据。 想象一下,如果是请求刚进入下单接口网关,此时是没有ContextCarrier 的,到下一层之前,会把ContextCarrier 里面的数据,设置到http请求头中,然后进行跨进程传递,下一层就会构造出isEmpty为false的ContextCarrier ,从而就得到了正确的TraceSegmentRef 。

5.2.2 span的实现

Span相对复杂一点,因为有三种类型,首先我们定义一个抽象span基类,在apm-commons项目下新建类:

com.hadluo.apm.commons.trace.AbstractSpan

public interface AbstractSpan {
    // 设置 当前span 的操作 的插件名称, 比如 :tomcat插件,mysql插件等
    AbstractSpan setComponent(String component);
    // 插件的分层 : DB层 , cache缓存层, rpc层,  http层 , mq层
    AbstractSpan setLayer(SpanLayer spanLayer);
    // 设置操作名称
    AbstractSpan setOperationName(String operationName);
    // 设置 一些 tag 值
    AbstractSpan setTag(String key, String value);
    // 开启
    AbstractSpan start();
    // 结束
    AbstractSpan finish();
    // 获取父亲spanid
    int getParentSpanId();
    // 获取当前的spanId
    int getSpanId();
    // 设置 当前span 所在trace segment 的前一个 ref
    void ref(TraceSegmentRef ref);
    // 记录错误
    AbstractSpan log(Throwable t);
}

以上方法都是操作span基本属性的方法,只有start和finish比较特殊。这两个方法标志span的开始和结束的一些动作。

SpanLayer 为一个枚举类, 在apm-commons目录下新建类:

com.hadluo.apm.commons.trace.SpanLayer

public enum SpanLayer {
    DB(1), RPC_FRAMEWORK(2), HTTP(3), MQ(4), CACHE(5);
    private int code;
    SpanLayer(int code) {
        this.code = code;
    }
    public int getCode() {
        return code;
    }
}

然后新建一个抽象基本功能的实现类,在apm-agent-core模块新建类:

com.hadluo.apm.agentcore.trace.AbstractTracingSpan

public abstract class AbstractTracingSpan implements AbstractSpan {
    // 当前 spanId
    @Setter
    private int spanId;
    // 上一级 的spanId
    private int parentSpanId;
    // 当前span操作
    private String operationName;
    private String componentName;
    // tag
    private final Map<String, String> tag = new HashMap<String, String>();
    // 当前span所在的segment的 前一个segment , 当是批量线程调用时,就会是多个
    protected final List<TraceSegmentRef> refs = new ArrayList<>();
    // 当前span操作的分层: DB,CACHE,RPC,HTTP,MQ
    private SpanLayer spanLayer;
    // 开始时间
    private long startTime;
    // 结束时间
    private long endTime;
    // 错误堆栈
    private Map<String, String> log = new HashMap<>();

    protected AbstractTracingSpan(int spanId, int parentSpanId, String operationName) {
        this.operationName = operationName;
        this.spanId = spanId;
        this.parentSpanId = parentSpanId;
    }

    @Override
    public AbstractSpan setComponent(String component) {
        this.componentName = component;
        return this;
    }
    @Override
    public AbstractSpan setLayer(SpanLayer spanLayer) {
        this.spanLayer = spanLayer;
        return this;
    }
    @Override
    public AbstractTracingSpan setOperationName(String operationName) {
        this.operationName = operationName;
        return this;
    }
    @Override
    public AbstractSpan setTag(String key, String value) {
        tag.put(key, value);
        return this;
    }
    @Override
    public AbstractSpan finish() {
        this.endTime = System.currentTimeMillis();
        return this;
    }
    @Override
    public AbstractSpan start() {
        this.startTime = System.currentTimeMillis();
        return this;
    }
    @Override
    public int getParentSpanId() {
        return parentSpanId;
    }
    @Override
    public int getSpanId() {
        return spanId;
    }
    @Override
    public void ref(TraceSegmentRef ref) {
        refs.add(ref);
    }
    @Override
    public AbstractSpan log(Throwable t) {
        log.put("time", System.currentTimeMillis() + "");
        log.put("message", t.getMessage());
        // 取4000长度
        log.put("stack", Logs.convert2String(t, 4000));
        return this;
    }
}

AbstractTracingSpan抽象类很简单,就是实现了对span基本属性的设置操作,span的基本属性在前面讲解理论也提到过,这里不在赘述。

然后开始实现具体的EntrySpan和ExitSpan,我们前面理论知识提到过,一个请求进入tomcat然后到springmvc,这两个插件的EntrySpan要被复用,只有一个,并且oprationName,tag,layer等记录的是SpringMVC层的信息,而ExitSpan的模式正好相反。

这里我们通过一个模拟的栈来实现上述功能。我们假象EntrySpan,ExitSpan 都有一个stackDepth栈深度属性,EntrySpan还有一个maxStackDepth最大栈深的属性。

EntrySpan的模拟

当请求到tomcat构建出EntrySpan时,stackDepth栈深度和maxStackDepth都从0加到1,进入springmvc层又都加1,设置oprationName,tag,layer信息时,判断stackDepth==maxStackDepth时,才设置,是不是调用越深,记录的就是靠近最里层的信息,当springmvc方法返回时,stackDepth减1,但是maxStackDepth不变,这样stackDepth就不会等于maxStackDepth,就不会覆盖oprationName,tag,layer信息。

ExitSpan 的模拟

ExitSpan 只有一个stackDepth栈深度的属性,当stackDepth==1时,记录oprationName,tag,layer信息,这样就保证了记录第一次的信息。

由于两者都有一个stackDepth栈深度属性,所以还可以抽象一层基于栈的抽象类,在apm-agent-core模块新建类:

com.hadluo.apm.agentcore.trace.StackBasedTracingSpan

public abstract class StackBasedTracingSpan extends AbstractTracingSpan {
    // 当前栈深度
    int stackDepth;
    protected StackBasedTracingSpan(int spanId, int parentSpanId, String operationName) {
        super(spanId, parentSpanId, operationName);
    }
// 创建span的方法返回时会调用
    @Override
    public AbstractSpan finish() {
        if (--stackDepth == 0) {
            // 减到0代表栈为空了
            super.finish();
        }
        return this;
    }
}

这个finish方法并不是代表span结束,只有stackDepth 减到0时,才代表当前span结束,前面提到过,请求从tomcat到springmvc,stackDepth 的值会加到2,在springmvc方法返回时,会调用finish,stackDepth 的值减到1,此时并不是span结束,当tomcat层的方法返回时,stackDepth 的值减到0,span才会结束,然后调用父类的finish,记录结束时间, 此时这个span就真正结束了,就该加入到trace segment里面归档了。

最后一层就是EntrySpan代码,在apm-agent-core模块新建类:

com.hadluo.apm.agentcore.trace.EntrySpan

public class EntrySpan extends StackBasedTracingSpan {
    // 最大栈深,只增不减
    private int maxStackDepth;
    protected EntrySpan(int spanId, int parentSpanId, String operationName) {
        super(spanId, parentSpanId, operationName);
    }
    @Override
    public AbstractSpan start() {
        // 当前栈深加1
        stackDepth = stackDepth + 1;
        // 赋值给最大栈深
        maxStackDepth = stackDepth;
        if(stackDepth == 1){
            // 第一次进来
            super.start();
        }
        return this;
    }

    @Override
    public AbstractSpan setTag(String key, String value) {
        // 比如:一个请求先进到Tomcat插件,然后进入到SpringMVC插件
        // 进到 Tomcat 时,创建了entry span调用 start方法,stackDepth=1 , maxStackDepth=1 , 记录tag
        // 在进入到SpringMvc时, 会复用span,但是会调用start方法,stackDepth=2 , maxStackDepth=2, 覆盖tag
        // 出来时,调 finish,stackDepth减1 ,maxStackDepth不变, tag值不变
        // 所以就记录的是 SpringMvc时的tag信息,也就是靠近里层的信息
        if(maxStackDepth == stackDepth){
            return super.setTag(key, value);
        }
        return this;
    }

    @Override
    public AbstractSpan setLayer(SpanLayer spanLayer) {
        // 同理 setTag
        if(maxStackDepth == stackDepth){
            return super.setLayer(spanLayer);
        }
        return this;
    }

    @Override
    public AbstractTracingSpan setOperationName(String operationName) {
        // 同理 setTag
        if(maxStackDepth == stackDepth){
            return super.setOperationName(operationName);
        }
        return this;
    }
}

EntrySpan的实现关键就在于 maxStackDepth 和stackDepth的管理,以及判断 maxStackDepth 和stackDepth相等时,才设置有用的信息。

ExitSpan 代码,在apm-agent-core模块新建类:

com.hadluo.apm.agentcore.trace.ExitSpan

public class ExitSpan extends StackBasedTracingSpan {
    protected ExitSpan(int spanId, int parentSpanId, String operationName) {
        super(spanId, parentSpanId, operationName);
    }
    @Override
    public AbstractSpan start() {
        stackDepth = stackDepth + 1;
        if (stackDepth == 1) {
            super.start();
        }
        return this;
    }

    @Override
    public AbstractSpan setLayer(SpanLayer spanLayer) {
        if(stackDepth == 1){
            // 只有第一次会记录
            return super.setLayer(spanLayer);
        }
        return this;
    }

    @Override
    public AbstractSpan setTag(String key, String value) {
        if(stackDepth == 1){
            // 只有第一次会记录
            return super.setTag(key, value);
        }
        return this;
    }

    @Override
    public AbstractTracingSpan setOperationName(String operationName) {
        if(stackDepth == 1){
            // 只有第一次会记录
            return super.setOperationName(operationName);
        }
        return this;
    }
}

ExitSpan 的实现关键就在于 stackDepth的管理,以及判断 stackDepth == 1时,才设置有用的信息。

Span的类型还缺一个LocalSpan , 这个比较简单,没有复用的栈逻辑。LocalSpan 代码,在apm-agent-core模块新建类:

com.hadluo.apm.agentcore.trace.LocalSpan

public class LocalSpan extends AbstractTracingSpan {
    protected LocalSpan(int spanId, int parentSpanId, String operationName) {
        super(spanId, parentSpanId, operationName);
    }
}

其实为了逻辑流程的通用性,我们还需要一个忽略的Span类型,比如,当我们的采样率控制服务判断链路不需要采样时,为了流程的通用性,我们还是要构建一个Span,只是这个类型为一个忽略的Span类型,当发送到OAP时,我们判断这个Span类型就过滤掉就好。

忽略的Span类型代码,在apm-agent-core模块新建类:

com.hadluo.apm.agentcore.trace.LoopSpan

public class LoopSpan implements AbstractSpan {

// 里面的实现方法都是空实现,没有逻辑

}

5.3 链路上下文

上小节部分我们实现了TraceSegment和Span, 但是我们还需要一个context去管理它们。

前面说到了span内部为了保证EntrySpan记录最深一层信息,和ExitSpan记录第一层信息设计了一个类似栈的结构。其实我们的TraceSegment里面的所有Span也是一个真实的栈结构,这个栈不同于上面的说的栈,两者没有关系。

请求调用本地方法,或者rpc方法,都是一个TraceSegment里面的Span的入栈和出栈的操作,当栈为空时,证明这个TraceSegment完结,就可以发送到OAP后端了。这些Span的管理,和入栈出栈就交给我们的链路管理的上下文(AbstraceTraceContext )。这里一个AbstraceTraceContext 对应一个TraceSegment。

当然AbstraceTraceContext还有一些其他的功能,我们直接编写AbstraceTraceContext代码,在apm-commons模块新建类:

com.hadluo.apm.commons.trace.AbstraceTraceContext

public abstract class AbstraceTraceContext {
    // 创建 entry span
    public abstract AbstractSpan createEntrySpan(String operationName);
    // 创建 local span
    public abstract AbstractSpan createLocalSpan(String operationName);
    // 创建 exit span
    public abstract AbstractSpan createExitSpan(String operationName, String remotePeer);
    /***
     * 跨进程调用时, 将ContextCarrier设置到当前trace segment 的ref上
     * @param carrier
     */
    public abstract void extract(ContextCarrier carrier);
    /**
     * 创建 entry span的 链路方法 结束时调用
     */
    public abstract void stopSpan();
    /**
     * 获取栈顶的span
     * @return
     */
    public abstract AbstractSpan acviveSpan();
    /**
     * 当前span栈是否为空
     * @return
     */
    public abstract boolean isEmpty() ;
}

它的实现类代码,在apm-agent-core模块新建类:

com.hadluo.apm.agentcore.trace.TracingContext

public class TracingContext extends AbstraceTraceContext {
    // 对应的 trace segment
    private final TraceSegment traceSegment ;
    // span 栈
    private final LinkedList<AbstractSpan> spanStack = new LinkedList<>();
    // span id 自增器
    private final AtomicInteger spanIdGenerator = new AtomicInteger(0);
    // kafka发送服务
    private final KafkaProducerManager kafkaProducerManager ;
    public TracingContext(){
        this.traceSegment = new TraceSegment() ;
        kafkaProducerManager = ServiceManager.INSTANCE.getService(KafkaProducerManager.class);
    }
    // 出栈,但栈内元素不变
    private AbstractSpan pop(){
        try {
            return spanStack.getLast() ;
        }catch (NoSuchElementException e){
            return null ;
        }
    }
    // 入栈
    private void push(AbstractSpan span){
        spanStack.addLast(span);
    }

    public AbstractSpan createEntrySpan(String operationName) {
    }

    public AbstractSpan createLocalSpan(String operationName) {
    }

    @Override
    public AbstractSpan createExitSpan(String operationName, String remotePeer) {
    }

    @Override
    public void extract(ContextCarrier carrier) {
    }

    @Override
    public void stopSpan() {
    }
    @Override
    public AbstractSpan acviveSpan() {
        return pop();
    }
    public boolean isEmpty(){
        return spanStack.isEmpty();
    }
}

TracingContext的核心就是维护span栈,通过LinkedList实现栈。一个TracingContext对应一个TraceSegment, 其实这个Context就是TraceSegment的辅助类。还有几个重要方法未实现单独提出来讲。

createEntrySpan方法代码如下:

public AbstractSpan createEntrySpan(String operationName) {
    // 获取栈顶, 不弹出栈元素
    AbstractSpan parent = pop() ;
    AbstractSpan entrySpan;
    // 设置 父span的id, 没有parent就 为-1, 否则就是parent的id
    int parentSpanId = (parent == null?-1:parent.getSpanId()) ;
    if(parent != null && parent instanceof EntrySpan){
        // 这里很重要, 要复用span,两个相邻的span都是entry span 就要发生复用
        parent.setOperationName(operationName);
        entrySpan = parent;
        return entrySpan.start();
    }
    // 真正创建
    entrySpan = new EntrySpan( spanIdGenerator.getAndIncrement()  , parentSpanId,operationName ) ;
    // 入栈
    push(entrySpan);
    return entrySpan.start();
}

上述代码关键就在于entry span的复用,因为之前举了接口先到tomcat然后在到springmvc , 两个插件内执行时都要创建entry span ,这就是相邻两个都是entry span的场景, 要复用tomcat插件创建的entry span,而信息设置的是springmvc 插件的方法信息。

如果不是相邻的entry span,就要创建一个新的,然后入栈。

createLocalSpan方法代码如下:

public AbstractSpan createLocalSpan(String operationName) {
    AbstractSpan parent = pop() ;
// 设置 父span的id, 没有parent就 为-1, 否则就是parent的id
    int parentSpanId = (parent == null?-1:parent.getSpanId()) ;
    LocalSpan localSpan = new LocalSpan( spanIdGenerator.getAndIncrement()  , parentSpanId,operationName ) ;
    // 入栈
    push(localSpan);
    return localSpan;
}

LocalSpan没有复用的逻辑, 直接创建一个新的,然后入栈。

createExitSpan方法代码如下:

@Override
public AbstractSpan createExitSpan(String operationName, String remotePeer) {
    AbstractSpan parent = pop() ;
    int parentSpanId = (parent == null?-1:parent.getSpanId()) ;
    if(parent instanceof ExitSpan){
        // 要复用这个span
        parent.start();
        return parent;
    }
    ExitSpan exitSpan = new ExitSpan( spanIdGenerator.getAndIncrement()  , parentSpanId,operationName ) ;
    exitSpan.start();
// 将远端地址设置到tag里面
    exitSpan.setTag("remotePeer" , remotePeer) ;
    //入栈
    push(exitSpan);
    return exitSpan;
}

前面提到过ExitSpan理论,当feign插件调用httpclient插件发起接口请求时,这种属于方法的嵌套(前面方法还没返回),也是相邻复用的逻辑。

如果是调redis然后调mysql,这种是不存在复用的逻辑的,因为调redis方法返回了然后在调的mysql,不属于嵌套关系,当后面用调mysql时,之前调redis的ExitSpan已经在redis方法返回时出栈了,所以parent不可能是ExitSpan。

ExitSpan还有一个特殊属性就是远端地址,比如:发起redis调用,会创建一个ExitSpan,远端地址就是redis集群的地址,然后设置到tag里面。

extract方法代码如下:

@Override
public void extract(ContextCarrier carrier) {
    if(carrier.isEmpty()){
        return ;
    }
    TraceSegmentRef ref = new TraceSegmentRef(carrier);
    this.traceSegment.setRef(ref);
    this.traceSegment.setTraceId(carrier.getTraceId());
    AbstractSpan span = pop();
    if (span instanceof EntrySpan) {
        span.ref(ref);
    }
}

extract 就是借助跨进程传递的ContextCarrier 对象信息生成TraceSegmentRef , 然后设置到当前TraceSegment的ref字段上,如果是entry span入口类型的,还需要添加到span的ref上。其实就是上一个trace segment的信息传递。

setTraceId这个方法相当重要,标志了一条链路上的所有trace segment的traceId一样。

stopSpan方法代码如下:

@Override
public void stopSpan() {
    AbstractSpan span = pop();
    // span 的结束
    span.finish();
    // 将span加到segment 中
    this.traceSegment.addSpan(span);
    // 移除栈顶
    spanStack.removeLast();
    if(spanStack.isEmpty()){
        // 将 segment 发送到 后端
        kafkaProducerManager.send(this.traceSegment.transtorm());
    }
}

一个span结束后, 要将span归档到trace segment里面 , 当span栈为空时,代表这个trace segment结束,需要将数据发送到后端,但是发送的对象并不是原生的TraceSegment, 而是通过transtorm方法复制的新对象。下面我们实现下真正发送到kafka的数据对象和transtorm方法。

TraceSegment的transtorm方法代码如下:

public Segment transtorm(){
    // 转换成 kafka发送的数据
    Segment  segment = new Segment();
    segment.setTraceSegmentId(traceSegmentId);
    segment.setTraceId(traceId);
    segment.setSpans(new ArrayList<>());
    spanList.forEach(item->segment.getSpans().add(item.transtform()));
    segment.setMsgTypeClass(Segment.class.getName());
    segment.setServiceName(Config.Agent.serviceName);
    segment.setServiceInstance(Config.Agent.serviceInstance);
    return segment ;
}

封装span时,又调用了span的transtorm,代码如下:

public Segment.Span transtform() {
    Segment.Span span = new Segment.Span();
    span.setSpanId(spanId);
    span.setParentSpanId(parentSpanId);
    span.setStartTime(startTime);
    span.setEndTime(endTime);
    span.setOperationName(operationName);
    if (this instanceof EntrySpan) {
        span.setSpanType("Entry");
    } else if (this instanceof LocalSpan) {
        span.setSpanType("Local");
    } else {
        span.setSpanType("Exit");
    }
    if(spanLayer != null){
        span.setSpanLayer(spanLayer.toString());
    }
    span.setComponent(componentName);
    span.setLogs(log);
    span.setRefs(new ArrayList<>());
    this.refs.forEach(item -> span.getRefs().add(item.transform()));
    span.setTags(tag);
    return span;
}

还需要在AbstractSpan上添加transtform,我就不写了。还调用了TraceSegmentRef的transform,代码如下:

public Segment.SegmentReference transform(){
    Segment.SegmentReference reference = new Segment.SegmentReference();
    reference.setRefType(type.toString());
    reference.setTraceId(traceId);
    reference.setParentTraceSegmentId(traceSegmentId);
    reference.setParentSpanId(spanId);
    reference.setParentService(parentServiceName);
    reference.setParentServiceInstance(parentServiceInstance);
    return reference ;
}

然后在apm-commons 新建kafka发送的实体,新建类:

com.hadluo.apm.commons.kafka.Segment

@Data
public class Segment extends BaseMsg{
    private String traceId ;
    private String traceSegmentId;
    private List<Span> spans ;
    @Data
    public static class Span {
        private int spanId;
        private int parentSpanId;
        private long startTime;
        private long endTime;
        private List<SegmentReference> refs ;
        private String operationName;
        private String peer;
        private String spanType;
        //    DB(1), RPC_FRAMEWORK(2), HTTP(3), MQ(4), CACHE(5);
        private String spanLayer ;
        private String component;
        private Map<String , String> tags ;
        private Map<String , String> logs ;
    }
    @Data
    public static class SegmentReference {
        private String refType;
        private String traceId;
        private String parentTraceSegmentId;
        private int parentSpanId ;
        private String parentService;
        private String parentServiceInstance;
        private String networkAddressUsedAtPeer;
    }
}

到此链路上下文,我们基本实现,后续还会添加新方法。当然,类似前面的LoopSpan忽略的Span逻辑,我们同样也有一个忽略的Context,在apm-agent-core项目下新建类:

com.hadluo.apm.agentcore.trace.LoopTraceContext:

public class LoopTraceContext extends AbstraceTraceContext {
    private final LoopSpan INSTANCE = new LoopSpan();
    @Override
    public AbstractSpan createEntrySpan(String operationName) {
        return INSTANCE;
    }
    @Override
    public AbstractSpan createLocalSpan(String operationName) {
        return INSTANCE;
    }
    @Override
    public AbstractSpan createExitSpan(String operationName, String remotePeer) {
        return INSTANCE;
    }

    @Override
    public AbstractSpan acviveSpan() {
        return INSTANCE;
    }
}

方法也都是空的,入栈和出栈都是LoopSpan一个实例。

5.4 链路上下文管理器服务

前面说到一个上下文对应着一个TraceSegment,而TraceSegment是一个线程的所有Span操作。所以上下文跟线程绑定,这里我们把上下文放入到一个ThreadLocal中进行管理,于是我们又设计了上下文管理器(TraceContextManager) , 用来管理上下文,这个管理器还是一个BootService服务。

在apm-commoms模块中,新建类:

com.hadluo.apm.commons.trace.TraceContextManager

public class TraceContextManager implements BootService {
    // 采样服务
    private SamplingService samplingService;
    // 持有 上下文 的 ThreadLocal
    private ThreadLocal<AbstraceTraceContext> CONTEXT = new ThreadLocal<>();

    // 从ThreadLocal中取 上下文
    private AbstraceTraceContext getOrCreate(boolean passed) {
        if (CONTEXT.get() == null) {
            if(!passed){
                try {
                    CONTEXT.set((AbstraceTraceContext) Class.forName("com.hadluo.apm.agentcore.trace.LoopTraceContext").newInstance());
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }else{
                try {
                    CONTEXT.set((AbstraceTraceContext) Class.forName("com.hadluo.apm.agentcore.trace.TracingContext").newInstance());
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }
        return CONTEXT.get();
    }

    public AbstractSpan createEntrySpan(String operationName, ContextCarrier contextCarrier) {
        AbstraceTraceContext context  ;
        if (contextCarrier == null || contextCarrier.isEmpty()) {
            //携带参数为空, 前面没有链路的调用
            context =  getOrCreate(samplingService.trySampling());
        }else {
            // 前面的调用链路是已经采样了, 后续的调用 也必须要采样
            context =  getOrCreate(true);
        }
        AbstractSpan span =  context.createEntrySpan(operationName);
        context.extract(contextCarrier);
        return span;
    }

    public AbstractSpan createLocalSpan(String operationName) {
        return getOrCreate(true).createLocalSpan(operationName);
    }

    public AbstractSpan createExitSpan(String operationName, String remotePeer){
        return getOrCreate(true).createExitSpan(operationName,remotePeer);
    }

    public void stopSpan(){
        AbstraceTraceContext context = CONTEXT.get();
        context.stopSpan();
        if(context.isEmpty()){
            // 栈已经是空的了,需要将线程变量移除
            CONTEXT.remove();
        }
    }

    public AbstractSpan activeSpan(){
        return CONTEXT.get().acviveSpan();
    }

    @Override
    public void prepare() throws Throwable {
        this.samplingService = ServiceManager.INSTANCE.getService(SamplingService.class);
    }
}

TraceContextManager 维护了线程ThreadLocal ,代理了上下文的创建span的几个方法。

createEntrySpan 需要单独说明下,ContextCarrier 参数是跨进程或跨线程传递的上一级TraceSegment的信息,在之前实现TraceSegment的时候已经实现过。判断如果是携带ContextCarrier 参数的,代表前面的TraceSegment已经存在,则后面的TraceSegment必须要采样(链路不能断掉)。

context.extract方法就是将ContextCarrier的值设置到 TraceSegmentRef ,这个 TraceSegmentRef 就是指向上一级的TraceSegment信息,之前也提到过。

5.5 本章小结

本章是整个链路监控的数据结构核心,通过对链路的抽象将链路划分为具体的TraceSegment,Span等。他们的结构如图(图片摘抄于网络):

本章还介绍了EntrySpan的类似栈设计,通过stackDepth(当前栈深度)和maxStackDepth(最大栈深度)灵活的控制了怎么保证记录靠近调用内测的信息。

本章还介绍了链路的上下文AbstraceTraceContext,一个上下文对应一个TraceSegment, 通过这个上下文对所属的所有Span也进行了栈管理,这是一个真实先进后出的栈,值得注意的是入栈时,需要保证EntrySpan和ExitSpan的复用对象逻辑。出栈时,当栈为空,表示这个TraceSegment已经结束需要归档发送到kafka。

由于TraceSegment属于线程里面的操作,所以还创作了一个基于ThreadLocal的下文的管理器服务TraceContextManager,这个服务通过线程ThreadLocal来管理链路的上下文AbstraceTraceContext。

本章没有进行代码测试,因为可能一些细节后续还要修改,等后面介绍具体的插桩插件如何调用TraceContextManager的创建Span方法,传递怎样的参数,再进行修改完善代码和测试。


http://www.kler.cn/a/454421.html

相关文章:

  • 面试经典 150 题——数组/字符串(一)
  • Python基础语法知识——列表、字典、元组与集合
  • IPD管理体系框架架应用实践
  • LeetCode 83 :删除排链表中的重复元素
  • 【报错】node:internal/modules/cjs/loader:936
  • 【ES6复习笔记】集合Set(13)
  • SQL进阶技巧:如何分析工厂制程顺序问题?
  • GXUOJ-算法-第一次作业
  • Mysql数据究竟是如何存储的
  • flink cdc各种数据库 jar下载地址
  • 【Java 学习】详细讲解---包和导包、Scanner类、输入源
  • 32. 线程、进程与协程
  • javaweb 04 springmvc
  • 【从零开始入门unity游戏开发之——C#篇30】C#常用泛型数据结构类——list<T>列表、`List<T>` 和数组 (`T[]`) 的选择
  • BFD综合详细实验配置案例
  • GitLab 服务变更提醒:中国大陆、澳门和香港用户停止提供服务(GitLab 服务停止)
  • vue3使用video-player实现视频播放(可拖动视频窗口、调整大小)
  • HTTP、HTTPS和SOCKS5代理協議
  • WSL2上Ubuntu22.04安装Docker
  • Windows 使用 非安装版MySQL 8
  • Linux网络——TCP的运用
  • QT集成intel RealSense 双目摄像头
  • NLP 中文拼写检测开源-01-基于贝叶斯公式的拼写检查器 CSC
  • Leetcode 394-字符串解码
  • MinIO服务器文件复制(Windows环境Linux环境)
  • LLMs之o3:《Deliberative Alignment: Reasoning Enables Safer Language Models》翻译与解读