PDF书籍《手写调用链监控APM系统-Java版》第6章 链路的架构(Trace+TraceSegment+Span)
本人阅读了 Skywalking 的大部分核心代码,也了解了相关的文献,对此深有感悟,特此借助巨人的思想自己手动用JAVA语言实现了一个 “调用链监控APM” 系统。本书采用边讲解实现原理边编写代码的方式,看本书时一定要跟着敲代码。
作者已经将过程写成一部书籍,奈何没有钱发表,如果您知道渠道可以联系本人。一定重谢。
本书涉及到的核心技术与思想
JavaAgent , ByteBuddy,SPI服务,类加载器的命名空间,增强JDK类,kafka,插件思想,切面,链路栈等等。实际上远不止这么多,差不多贯通了整个java体系。
适用人群
自己公司要实现自己的调用链的;写架构的;深入java编程的;阅读Skywalking源码的;
版权
本书是作者呕心沥血亲自编写的代码,不经同意切勿拿出去商用,否则会追究其责任。
原版PDF+源码请见:
本章涉及到的工具类也在下面:
PDF书籍《手写调用链监控APM系统-Java版》第1章 开篇介绍-CSDN博客
第6章 链路的架构(Trace+TraceSegment+Span)
经过前面章节的洗礼,能坚持读到这里的已是难能可贵,也证明你离胜利已经不远了。
本章介绍的是在前面章节的切面环绕方法进行链路埋点, 将一系列方法请求调用的信息抽象成链路(Trace),然后发送到后端OAP。请求调用包含 “http调用”,“DB调用”,“MQ调用”,“Cache调用”,“RPC调用”,“本地方法调用”。复杂的微服务系统就是由以上6种调用组成错综复杂的链路。调用链监控系统就是采集N条链路然后进行监控。
一条链路我们又可以抽象出TraceSegment和Span的概念,下面我们来详细讲解下链路的知识。
5.1 链路的理论知识
5.1.1 Trace的介绍
Trace就是一条链路,是指一个请求或者一个操作从开始到结束的完整路径。Trace结束后会被立马发送到后端。
比如浏览器访问下单接口,首先请求到达网关,此时一条链路就开始了,会分配一个唯一taceId作为标识,直到这个下单接口的网关返回给浏览器了,这条trace便结束,然后被立马发送到kafka。
实际上我们后面编写代码是没有Trace这个类的,是一个抽象概念,只有traceId这个实际的字符串存在。
5.1.2 TraceSegment的介绍
一个Trace由很多的TraceSegment组成。TraceSegment是包含了JVM线程,或进程的一些列操作,也就是说,如果方法执行时开辟了新的线程,就会新生成一个TraceSegment记录调用信息, 如果时发起了rpc请求(跨进程)也会生成一个TraceSegment。
比如下单接口进入到了下单服务,下单服务首先调用本地的一个校验参数方法,由于没有跨线程或进程,到这里都只构建一个TraceSegment。当发起RPC调用查询库存服务库存是否充足时,此时就会产生一个新的TraceSegment,新的TraceSegment会持有一个TraceSegmentRef,指向前一个TraceSegment信息 ,这样就把调用串联起来了。
TraceSegment也有自己的id,同时也有traceId ,一条链路的所有TraceSegment里面的traceId都是相同的。后端分析时,只需要查询出traceId相同的TraceSegment集合,就可以分析一条链路了。
5.1.3 Span的介绍
TraceSegment下面还有Span,Span就是最小粒度了,代表一个具体的操作, 也就是上面提到过的“http调用”,“DB调用”,“MQ调用”,“Cache调用”,“RPC调用”,“本地方法调用”。
TraceSegment 就是由很多具体的span组成,在后面代码中,TraceSegment 类里面会有一个List集合,里面就是存储的当前所属的span。
Span里面包含很多关于调用的信息,都是上报到后端的信息。
第一个是operationName:当前的操作的名称,如果是http接口的话,这个就是请求的url地址。
第二个是component:当前是哪个组件,比如Tomcat,Mysql等。
第三个是tag信息:tag是一个map数组,记录一些额外信息,比如当前是HTTP请求,tag就会记录http.method=GET|POST 等。
第四个是spanLayer:layer是一个枚举,里面包含:DB(1), RPC_FRAMEWORK(2), HTTP(3), MQ(4), CACHE(5);
第五个是log:记录的是发生错误时的错误堆栈信息。
第六个是time:包含span的创建时间和结束时间,结束就代表发起了另一个span或者请求结束。
每个span也有自己的整型spanId,在一个TraceSegment里面的所有的span的ID都是自增长的,从0开始,也就是说第一个span的id为0 ,span里面也有parentSpanId用于指向前一个span的id。第一个span的parentSpanId为-1 。
通过对调用的总结与抽象,我们可以将Span分为三大类型:EntrySpan,LocalSpan,ExitSpan。
EntrySpan
代表一个请求入口的类型的span, 比如一个下单请求进入了下单服务的controller,首先请求会进入tomcat框架层,此时就会被tomcat的插桩插件拦截处理,创建EntrySpan ,并且设置好operationName,layer,tag等信息。
但是请求后面还会进入springmvc ,然后在到我们的controller接口, 进入springmvc插桩插件时不会创建新的EntrySpan , 而是复用这个tomcat层创建的EntrySpan ,但是会覆盖之前的operationName,layer,tag。 这种复用节约资源的操作是最合理的设计。所以如果是连着两个都是Entry Span的就会产生复用逻辑,且信息记录的是靠近后面的信息。
值得注意的是,如果是调用查询redis或者mysql时,就不是创建EntrySpan, 而是后面要说的ExitSpan 。
LocalSpan
这个就很简单了,代表的是一个本地方法的调用,注意不是native方法,就是普通方法的调用。
ExitSpan
是链路中一个退出的点或者离开的Span,可以简单理解为离开当前线程或进程的操作。
拿我们之前的下单接口的例子来讲,请求进到tomcat,然后进入springmvc,然后进入到我们的下单方法,下单方法里面又有一个redis的调用和mysql的调用。
当请求redis时,由于是离开了当前进程,会创建一个ExitSpan , 然后请求mysql,又会创建一个ExitSpan ,这个ExitSpan 和请求redis创建的两者之前没有任何关系。
如果下单接口里又通过feign调用了库存服务,这时会有ExitSpan 的复用逻辑。 首先通过feign时,会被feign的插桩插件新建一个ExsitSpan,并且设置好operationName,layer,tag等信息。
然后feign通过httpclient发起http调用。此时httpclient的插桩插件会复用feign创建的ExsitSpan , 并且不会覆盖之前的operationName,layer,tag等信息,这个正好与EntrySpan相反。
写到这里,不知道读者能不能大概明白了链路的基本模型。可以反复几次读上面的理论知识,相信你一定能理解清楚,理解后,我们后面的编码会轻松很多。
5.2 链路TraceSegment,Span的编码实现
5.2.1 TraceSegment的实现
经过前面理论知识的洗礼,我们很容易写出TraceSegment的实现,在apm-agent-core 模块下,新建类:
com.hadluo.apm.agentcore.trace.TraceSegment
public class TraceSegment {
// 指向上一个 segment
@Setter
private TraceSegmentRef ref ;
// 当前 segment的 所有 span
private List<AbstractSpan> spanList = new ArrayList<>();
// segment的 id
private String traceSegmentId ;
// 一条跟踪链路的唯一id
@Setter
private String traceId ;
// 创建时间
private long createTime;
public TraceSegment() {
// 工具类生成唯一id
this.traceSegmentId = GlobalIdGenerator.generate();
this.createTime = System.currentTimeMillis();
this.traceId = GlobalIdGenerator.generate() ;
}
// 添加一个span 到 当前segment
public void addSpan(AbstractSpan span) {
this.spanList.add(span);
}
}
TraceSegment有指向上一个的ref,还有装span的集合,以及traceId,traceSegmentId 等。这里traceId在构造函数中赋值了,不是说一条链路的traceId都一样吗? 这里我们的traceId 字段上面还有一个@Setter,这个是lombok注解,提供了set重新设置这个traceId 的方法,所以会保证traceId 都是同一个的。
GlobalIdGenerator为工具类,在apm-commons里面。
TraceSegmentRef 需要建立在apm-commons项目下,后面会公用到,在apm-commons项目新建类:
com.hadluo.apm.commons.trace.TraceSegmentRef
public class TraceSegmentRef {
public enum SegmentRefType {
/*是跨进程产生的 trace segment*/
CROSS_PROCESS,
/*跨线程产生的新 trace segment*/
CROSS_THREAD
}
private SegmentRefType type;
private String traceId;
private String traceSegmentId;
// trace segment里面最后一个span id
private int spanId;
// 当前的服务名称
private String parentServiceName;
// 实例名称
private String parentServiceInstance ;
public TraceSegmentRef(ContextCarrier carrier) {
this.type = SegmentRefType.CROSS_PROCESS;
this.traceId = carrier.getTraceId();
this.traceSegmentId = carrier.getTraceSegmentId();
this.spanId = carrier.getSpanId();
this.parentServiceName = carrier.getParentServiceName();
this.parentServiceInstance = carrier.getParentServiceInstance();
}
}
这个TraceSegmentRef 就相当于TraceSegment。
之前我们讲到过,一个线程或进程里面操作就是一个TraceSegment, 如果产生一个TraceSegment必定就是跨线程或者进程操作,所以会有SegmentRefType 作为标识。其余的都是id的基本信息和服务名称等。
ContextCarrier 为跨线程或者进程传输的直接载体类,相当于一个bean对象。 当发生HTTP跨进程调用时,会把当前链路信息像traceId等,设置到http 的请求头里面,收到的服务就会解析生成ContextCarrier ,然后生成TraceSegmentRef ,就将TraceSegment进行了串联。
在apm-commons中新建类:
com.hadluo.apm.commons.trace.ContextCarrier
@Data
public class ContextCarrier {
private String traceId;
private String traceSegmentId;
// 最后一个span id
private int spanId;
private String parentServiceName;
private String parentServiceInstance;
public boolean isEmpty() {
return traceId == null || traceId.isEmpty();
}
}
isEmpty 方法可以判断出上游是否有携带数据。 想象一下,如果是请求刚进入下单接口网关,此时是没有ContextCarrier 的,到下一层之前,会把ContextCarrier 里面的数据,设置到http请求头中,然后进行跨进程传递,下一层就会构造出isEmpty为false的ContextCarrier ,从而就得到了正确的TraceSegmentRef 。
5.2.2 span的实现
Span相对复杂一点,因为有三种类型,首先我们定义一个抽象span基类,在apm-commons项目下新建类:
com.hadluo.apm.commons.trace.AbstractSpan
public interface AbstractSpan {
// 设置 当前span 的操作 的插件名称, 比如 :tomcat插件,mysql插件等
AbstractSpan setComponent(String component);
// 插件的分层 : DB层 , cache缓存层, rpc层, http层 , mq层
AbstractSpan setLayer(SpanLayer spanLayer);
// 设置操作名称
AbstractSpan setOperationName(String operationName);
// 设置 一些 tag 值
AbstractSpan setTag(String key, String value);
// 开启
AbstractSpan start();
// 结束
AbstractSpan finish();
// 获取父亲spanid
int getParentSpanId();
// 获取当前的spanId
int getSpanId();
// 设置 当前span 所在trace segment 的前一个 ref
void ref(TraceSegmentRef ref);
// 记录错误
AbstractSpan log(Throwable t);
}
以上方法都是操作span基本属性的方法,只有start和finish比较特殊。这两个方法标志span的开始和结束的一些动作。
SpanLayer 为一个枚举类, 在apm-commons目录下新建类:
com.hadluo.apm.commons.trace.SpanLayer
public enum SpanLayer {
DB(1), RPC_FRAMEWORK(2), HTTP(3), MQ(4), CACHE(5);
private int code;
SpanLayer(int code) {
this.code = code;
}
public int getCode() {
return code;
}
}
然后新建一个抽象基本功能的实现类,在apm-agent-core模块新建类:
com.hadluo.apm.agentcore.trace.AbstractTracingSpan
public abstract class AbstractTracingSpan implements AbstractSpan {
// 当前 spanId
@Setter
private int spanId;
// 上一级 的spanId
private int parentSpanId;
// 当前span操作
private String operationName;
private String componentName;
// tag
private final Map<String, String> tag = new HashMap<String, String>();
// 当前span所在的segment的 前一个segment , 当是批量线程调用时,就会是多个
protected final List<TraceSegmentRef> refs = new ArrayList<>();
// 当前span操作的分层: DB,CACHE,RPC,HTTP,MQ
private SpanLayer spanLayer;
// 开始时间
private long startTime;
// 结束时间
private long endTime;
// 错误堆栈
private Map<String, String> log = new HashMap<>();
protected AbstractTracingSpan(int spanId, int parentSpanId, String operationName) {
this.operationName = operationName;
this.spanId = spanId;
this.parentSpanId = parentSpanId;
}
@Override
public AbstractSpan setComponent(String component) {
this.componentName = component;
return this;
}
@Override
public AbstractSpan setLayer(SpanLayer spanLayer) {
this.spanLayer = spanLayer;
return this;
}
@Override
public AbstractTracingSpan setOperationName(String operationName) {
this.operationName = operationName;
return this;
}
@Override
public AbstractSpan setTag(String key, String value) {
tag.put(key, value);
return this;
}
@Override
public AbstractSpan finish() {
this.endTime = System.currentTimeMillis();
return this;
}
@Override
public AbstractSpan start() {
this.startTime = System.currentTimeMillis();
return this;
}
@Override
public int getParentSpanId() {
return parentSpanId;
}
@Override
public int getSpanId() {
return spanId;
}
@Override
public void ref(TraceSegmentRef ref) {
refs.add(ref);
}
@Override
public AbstractSpan log(Throwable t) {
log.put("time", System.currentTimeMillis() + "");
log.put("message", t.getMessage());
// 取4000长度
log.put("stack", Logs.convert2String(t, 4000));
return this;
}
}
AbstractTracingSpan抽象类很简单,就是实现了对span基本属性的设置操作,span的基本属性在前面讲解理论也提到过,这里不在赘述。
然后开始实现具体的EntrySpan和ExitSpan,我们前面理论知识提到过,一个请求进入tomcat然后到springmvc,这两个插件的EntrySpan要被复用,只有一个,并且oprationName,tag,layer等记录的是SpringMVC层的信息,而ExitSpan的模式正好相反。
这里我们通过一个模拟的栈来实现上述功能。我们假象EntrySpan,ExitSpan 都有一个stackDepth栈深度属性,EntrySpan还有一个maxStackDepth最大栈深的属性。
EntrySpan的模拟
当请求到tomcat构建出EntrySpan时,stackDepth栈深度和maxStackDepth都从0加到1,进入springmvc层又都加1,设置oprationName,tag,layer信息时,判断stackDepth==maxStackDepth时,才设置,是不是调用越深,记录的就是靠近最里层的信息,当springmvc方法返回时,stackDepth减1,但是maxStackDepth不变,这样stackDepth就不会等于maxStackDepth,就不会覆盖oprationName,tag,layer信息。
ExitSpan 的模拟
ExitSpan 只有一个stackDepth栈深度的属性,当stackDepth==1时,记录oprationName,tag,layer信息,这样就保证了记录第一次的信息。
由于两者都有一个stackDepth栈深度属性,所以还可以抽象一层基于栈的抽象类,在apm-agent-core模块新建类:
com.hadluo.apm.agentcore.trace.StackBasedTracingSpan
public abstract class StackBasedTracingSpan extends AbstractTracingSpan {
// 当前栈深度
int stackDepth;
protected StackBasedTracingSpan(int spanId, int parentSpanId, String operationName) {
super(spanId, parentSpanId, operationName);
}
// 创建span的方法返回时会调用
@Override
public AbstractSpan finish() {
if (--stackDepth == 0) {
// 减到0代表栈为空了
super.finish();
}
return this;
}
}
这个finish方法并不是代表span结束,只有stackDepth 减到0时,才代表当前span结束,前面提到过,请求从tomcat到springmvc,stackDepth 的值会加到2,在springmvc方法返回时,会调用finish,stackDepth 的值减到1,此时并不是span结束,当tomcat层的方法返回时,stackDepth 的值减到0,span才会结束,然后调用父类的finish,记录结束时间, 此时这个span就真正结束了,就该加入到trace segment里面归档了。
最后一层就是EntrySpan代码,在apm-agent-core模块新建类:
com.hadluo.apm.agentcore.trace.EntrySpan
public class EntrySpan extends StackBasedTracingSpan {
// 最大栈深,只增不减
private int maxStackDepth;
protected EntrySpan(int spanId, int parentSpanId, String operationName) {
super(spanId, parentSpanId, operationName);
}
@Override
public AbstractSpan start() {
// 当前栈深加1
stackDepth = stackDepth + 1;
// 赋值给最大栈深
maxStackDepth = stackDepth;
if(stackDepth == 1){
// 第一次进来
super.start();
}
return this;
}
@Override
public AbstractSpan setTag(String key, String value) {
// 比如:一个请求先进到Tomcat插件,然后进入到SpringMVC插件
// 进到 Tomcat 时,创建了entry span调用 start方法,stackDepth=1 , maxStackDepth=1 , 记录tag
// 在进入到SpringMvc时, 会复用span,但是会调用start方法,stackDepth=2 , maxStackDepth=2, 覆盖tag
// 出来时,调 finish,stackDepth减1 ,maxStackDepth不变, tag值不变
// 所以就记录的是 SpringMvc时的tag信息,也就是靠近里层的信息
if(maxStackDepth == stackDepth){
return super.setTag(key, value);
}
return this;
}
@Override
public AbstractSpan setLayer(SpanLayer spanLayer) {
// 同理 setTag
if(maxStackDepth == stackDepth){
return super.setLayer(spanLayer);
}
return this;
}
@Override
public AbstractTracingSpan setOperationName(String operationName) {
// 同理 setTag
if(maxStackDepth == stackDepth){
return super.setOperationName(operationName);
}
return this;
}
}
EntrySpan的实现关键就在于 maxStackDepth 和stackDepth的管理,以及判断 maxStackDepth 和stackDepth相等时,才设置有用的信息。
ExitSpan 代码,在apm-agent-core模块新建类:
com.hadluo.apm.agentcore.trace.ExitSpan
public class ExitSpan extends StackBasedTracingSpan {
protected ExitSpan(int spanId, int parentSpanId, String operationName) {
super(spanId, parentSpanId, operationName);
}
@Override
public AbstractSpan start() {
stackDepth = stackDepth + 1;
if (stackDepth == 1) {
super.start();
}
return this;
}
@Override
public AbstractSpan setLayer(SpanLayer spanLayer) {
if(stackDepth == 1){
// 只有第一次会记录
return super.setLayer(spanLayer);
}
return this;
}
@Override
public AbstractSpan setTag(String key, String value) {
if(stackDepth == 1){
// 只有第一次会记录
return super.setTag(key, value);
}
return this;
}
@Override
public AbstractTracingSpan setOperationName(String operationName) {
if(stackDepth == 1){
// 只有第一次会记录
return super.setOperationName(operationName);
}
return this;
}
}
ExitSpan 的实现关键就在于 stackDepth的管理,以及判断 stackDepth == 1时,才设置有用的信息。
Span的类型还缺一个LocalSpan , 这个比较简单,没有复用的栈逻辑。LocalSpan 代码,在apm-agent-core模块新建类:
com.hadluo.apm.agentcore.trace.LocalSpan
public class LocalSpan extends AbstractTracingSpan {
protected LocalSpan(int spanId, int parentSpanId, String operationName) {
super(spanId, parentSpanId, operationName);
}
}
其实为了逻辑流程的通用性,我们还需要一个忽略的Span类型,比如,当我们的采样率控制服务判断链路不需要采样时,为了流程的通用性,我们还是要构建一个Span,只是这个类型为一个忽略的Span类型,当发送到OAP时,我们判断这个Span类型就过滤掉就好。
忽略的Span类型代码,在apm-agent-core模块新建类:
com.hadluo.apm.agentcore.trace.LoopSpan
public class LoopSpan implements AbstractSpan {
// 里面的实现方法都是空实现,没有逻辑
}
5.3 链路上下文
上小节部分我们实现了TraceSegment和Span, 但是我们还需要一个context去管理它们。
前面说到了span内部为了保证EntrySpan记录最深一层信息,和ExitSpan记录第一层信息设计了一个类似栈的结构。其实我们的TraceSegment里面的所有Span也是一个真实的栈结构,这个栈不同于上面的说的栈,两者没有关系。
请求调用本地方法,或者rpc方法,都是一个TraceSegment里面的Span的入栈和出栈的操作,当栈为空时,证明这个TraceSegment完结,就可以发送到OAP后端了。这些Span的管理,和入栈出栈就交给我们的链路管理的上下文(AbstraceTraceContext )。这里一个AbstraceTraceContext 对应一个TraceSegment。
当然AbstraceTraceContext还有一些其他的功能,我们直接编写AbstraceTraceContext代码,在apm-commons模块新建类:
com.hadluo.apm.commons.trace.AbstraceTraceContext
public abstract class AbstraceTraceContext {
// 创建 entry span
public abstract AbstractSpan createEntrySpan(String operationName);
// 创建 local span
public abstract AbstractSpan createLocalSpan(String operationName);
// 创建 exit span
public abstract AbstractSpan createExitSpan(String operationName, String remotePeer);
/***
* 跨进程调用时, 将ContextCarrier设置到当前trace segment 的ref上
* @param carrier
*/
public abstract void extract(ContextCarrier carrier);
/**
* 创建 entry span的 链路方法 结束时调用
*/
public abstract void stopSpan();
/**
* 获取栈顶的span
* @return
*/
public abstract AbstractSpan acviveSpan();
/**
* 当前span栈是否为空
* @return
*/
public abstract boolean isEmpty() ;
}
它的实现类代码,在apm-agent-core模块新建类:
com.hadluo.apm.agentcore.trace.TracingContext
public class TracingContext extends AbstraceTraceContext {
// 对应的 trace segment
private final TraceSegment traceSegment ;
// span 栈
private final LinkedList<AbstractSpan> spanStack = new LinkedList<>();
// span id 自增器
private final AtomicInteger spanIdGenerator = new AtomicInteger(0);
// kafka发送服务
private final KafkaProducerManager kafkaProducerManager ;
public TracingContext(){
this.traceSegment = new TraceSegment() ;
kafkaProducerManager = ServiceManager.INSTANCE.getService(KafkaProducerManager.class);
}
// 出栈,但栈内元素不变
private AbstractSpan pop(){
try {
return spanStack.getLast() ;
}catch (NoSuchElementException e){
return null ;
}
}
// 入栈
private void push(AbstractSpan span){
spanStack.addLast(span);
}
public AbstractSpan createEntrySpan(String operationName) {
}
public AbstractSpan createLocalSpan(String operationName) {
}
@Override
public AbstractSpan createExitSpan(String operationName, String remotePeer) {
}
@Override
public void extract(ContextCarrier carrier) {
}
@Override
public void stopSpan() {
}
@Override
public AbstractSpan acviveSpan() {
return pop();
}
public boolean isEmpty(){
return spanStack.isEmpty();
}
}
TracingContext的核心就是维护span栈,通过LinkedList实现栈。一个TracingContext对应一个TraceSegment, 其实这个Context就是TraceSegment的辅助类。还有几个重要方法未实现单独提出来讲。
createEntrySpan方法代码如下:
public AbstractSpan createEntrySpan(String operationName) {
// 获取栈顶, 不弹出栈元素
AbstractSpan parent = pop() ;
AbstractSpan entrySpan;
// 设置 父span的id, 没有parent就 为-1, 否则就是parent的id
int parentSpanId = (parent == null?-1:parent.getSpanId()) ;
if(parent != null && parent instanceof EntrySpan){
// 这里很重要, 要复用span,两个相邻的span都是entry span 就要发生复用
parent.setOperationName(operationName);
entrySpan = parent;
return entrySpan.start();
}
// 真正创建
entrySpan = new EntrySpan( spanIdGenerator.getAndIncrement() , parentSpanId,operationName ) ;
// 入栈
push(entrySpan);
return entrySpan.start();
}
上述代码关键就在于entry span的复用,因为之前举了接口先到tomcat然后在到springmvc , 两个插件内执行时都要创建entry span ,这就是相邻两个都是entry span的场景, 要复用tomcat插件创建的entry span,而信息设置的是springmvc 插件的方法信息。
如果不是相邻的entry span,就要创建一个新的,然后入栈。
createLocalSpan方法代码如下:
public AbstractSpan createLocalSpan(String operationName) {
AbstractSpan parent = pop() ;
// 设置 父span的id, 没有parent就 为-1, 否则就是parent的id
int parentSpanId = (parent == null?-1:parent.getSpanId()) ;
LocalSpan localSpan = new LocalSpan( spanIdGenerator.getAndIncrement() , parentSpanId,operationName ) ;
// 入栈
push(localSpan);
return localSpan;
}
LocalSpan没有复用的逻辑, 直接创建一个新的,然后入栈。
createExitSpan方法代码如下:
@Override
public AbstractSpan createExitSpan(String operationName, String remotePeer) {
AbstractSpan parent = pop() ;
int parentSpanId = (parent == null?-1:parent.getSpanId()) ;
if(parent instanceof ExitSpan){
// 要复用这个span
parent.start();
return parent;
}
ExitSpan exitSpan = new ExitSpan( spanIdGenerator.getAndIncrement() , parentSpanId,operationName ) ;
exitSpan.start();
// 将远端地址设置到tag里面
exitSpan.setTag("remotePeer" , remotePeer) ;
//入栈
push(exitSpan);
return exitSpan;
}
前面提到过ExitSpan理论,当feign插件调用httpclient插件发起接口请求时,这种属于方法的嵌套(前面方法还没返回),也是相邻复用的逻辑。
如果是调redis然后调mysql,这种是不存在复用的逻辑的,因为调redis方法返回了然后在调的mysql,不属于嵌套关系,当后面用调mysql时,之前调redis的ExitSpan已经在redis方法返回时出栈了,所以parent不可能是ExitSpan。
ExitSpan还有一个特殊属性就是远端地址,比如:发起redis调用,会创建一个ExitSpan,远端地址就是redis集群的地址,然后设置到tag里面。
extract方法代码如下:
@Override
public void extract(ContextCarrier carrier) {
if(carrier.isEmpty()){
return ;
}
TraceSegmentRef ref = new TraceSegmentRef(carrier);
this.traceSegment.setRef(ref);
this.traceSegment.setTraceId(carrier.getTraceId());
AbstractSpan span = pop();
if (span instanceof EntrySpan) {
span.ref(ref);
}
}
extract 就是借助跨进程传递的ContextCarrier 对象信息生成TraceSegmentRef , 然后设置到当前TraceSegment的ref字段上,如果是entry span入口类型的,还需要添加到span的ref上。其实就是上一个trace segment的信息传递。
setTraceId这个方法相当重要,标志了一条链路上的所有trace segment的traceId一样。
stopSpan方法代码如下:
@Override
public void stopSpan() {
AbstractSpan span = pop();
// span 的结束
span.finish();
// 将span加到segment 中
this.traceSegment.addSpan(span);
// 移除栈顶
spanStack.removeLast();
if(spanStack.isEmpty()){
// 将 segment 发送到 后端
kafkaProducerManager.send(this.traceSegment.transtorm());
}
}
一个span结束后, 要将span归档到trace segment里面 , 当span栈为空时,代表这个trace segment结束,需要将数据发送到后端,但是发送的对象并不是原生的TraceSegment, 而是通过transtorm方法复制的新对象。下面我们实现下真正发送到kafka的数据对象和transtorm方法。
TraceSegment的transtorm方法代码如下:
public Segment transtorm(){
// 转换成 kafka发送的数据
Segment segment = new Segment();
segment.setTraceSegmentId(traceSegmentId);
segment.setTraceId(traceId);
segment.setSpans(new ArrayList<>());
spanList.forEach(item->segment.getSpans().add(item.transtform()));
segment.setMsgTypeClass(Segment.class.getName());
segment.setServiceName(Config.Agent.serviceName);
segment.setServiceInstance(Config.Agent.serviceInstance);
return segment ;
}
封装span时,又调用了span的transtorm,代码如下:
public Segment.Span transtform() {
Segment.Span span = new Segment.Span();
span.setSpanId(spanId);
span.setParentSpanId(parentSpanId);
span.setStartTime(startTime);
span.setEndTime(endTime);
span.setOperationName(operationName);
if (this instanceof EntrySpan) {
span.setSpanType("Entry");
} else if (this instanceof LocalSpan) {
span.setSpanType("Local");
} else {
span.setSpanType("Exit");
}
if(spanLayer != null){
span.setSpanLayer(spanLayer.toString());
}
span.setComponent(componentName);
span.setLogs(log);
span.setRefs(new ArrayList<>());
this.refs.forEach(item -> span.getRefs().add(item.transform()));
span.setTags(tag);
return span;
}
还需要在AbstractSpan上添加transtform,我就不写了。还调用了TraceSegmentRef的transform,代码如下:
public Segment.SegmentReference transform(){
Segment.SegmentReference reference = new Segment.SegmentReference();
reference.setRefType(type.toString());
reference.setTraceId(traceId);
reference.setParentTraceSegmentId(traceSegmentId);
reference.setParentSpanId(spanId);
reference.setParentService(parentServiceName);
reference.setParentServiceInstance(parentServiceInstance);
return reference ;
}
然后在apm-commons 新建kafka发送的实体,新建类:
com.hadluo.apm.commons.kafka.Segment
@Data
public class Segment extends BaseMsg{
private String traceId ;
private String traceSegmentId;
private List<Span> spans ;
@Data
public static class Span {
private int spanId;
private int parentSpanId;
private long startTime;
private long endTime;
private List<SegmentReference> refs ;
private String operationName;
private String peer;
private String spanType;
// DB(1), RPC_FRAMEWORK(2), HTTP(3), MQ(4), CACHE(5);
private String spanLayer ;
private String component;
private Map<String , String> tags ;
private Map<String , String> logs ;
}
@Data
public static class SegmentReference {
private String refType;
private String traceId;
private String parentTraceSegmentId;
private int parentSpanId ;
private String parentService;
private String parentServiceInstance;
private String networkAddressUsedAtPeer;
}
}
到此链路上下文,我们基本实现,后续还会添加新方法。当然,类似前面的LoopSpan忽略的Span逻辑,我们同样也有一个忽略的Context,在apm-agent-core项目下新建类:
com.hadluo.apm.agentcore.trace.LoopTraceContext:
public class LoopTraceContext extends AbstraceTraceContext {
private final LoopSpan INSTANCE = new LoopSpan();
@Override
public AbstractSpan createEntrySpan(String operationName) {
return INSTANCE;
}
@Override
public AbstractSpan createLocalSpan(String operationName) {
return INSTANCE;
}
@Override
public AbstractSpan createExitSpan(String operationName, String remotePeer) {
return INSTANCE;
}
@Override
public AbstractSpan acviveSpan() {
return INSTANCE;
}
}
方法也都是空的,入栈和出栈都是LoopSpan一个实例。
5.4 链路上下文管理器服务
前面说到一个上下文对应着一个TraceSegment,而TraceSegment是一个线程的所有Span操作。所以上下文跟线程绑定,这里我们把上下文放入到一个ThreadLocal中进行管理,于是我们又设计了上下文管理器(TraceContextManager) , 用来管理上下文,这个管理器还是一个BootService服务。
在apm-commoms模块中,新建类:
com.hadluo.apm.commons.trace.TraceContextManager
public class TraceContextManager implements BootService {
// 采样服务
private SamplingService samplingService;
// 持有 上下文 的 ThreadLocal
private ThreadLocal<AbstraceTraceContext> CONTEXT = new ThreadLocal<>();
// 从ThreadLocal中取 上下文
private AbstraceTraceContext getOrCreate(boolean passed) {
if (CONTEXT.get() == null) {
if(!passed){
try {
CONTEXT.set((AbstraceTraceContext) Class.forName("com.hadluo.apm.agentcore.trace.LoopTraceContext").newInstance());
} catch (Exception e) {
throw new RuntimeException(e);
}
}else{
try {
CONTEXT.set((AbstraceTraceContext) Class.forName("com.hadluo.apm.agentcore.trace.TracingContext").newInstance());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
return CONTEXT.get();
}
public AbstractSpan createEntrySpan(String operationName, ContextCarrier contextCarrier) {
AbstraceTraceContext context ;
if (contextCarrier == null || contextCarrier.isEmpty()) {
//携带参数为空, 前面没有链路的调用
context = getOrCreate(samplingService.trySampling());
}else {
// 前面的调用链路是已经采样了, 后续的调用 也必须要采样
context = getOrCreate(true);
}
AbstractSpan span = context.createEntrySpan(operationName);
context.extract(contextCarrier);
return span;
}
public AbstractSpan createLocalSpan(String operationName) {
return getOrCreate(true).createLocalSpan(operationName);
}
public AbstractSpan createExitSpan(String operationName, String remotePeer){
return getOrCreate(true).createExitSpan(operationName,remotePeer);
}
public void stopSpan(){
AbstraceTraceContext context = CONTEXT.get();
context.stopSpan();
if(context.isEmpty()){
// 栈已经是空的了,需要将线程变量移除
CONTEXT.remove();
}
}
public AbstractSpan activeSpan(){
return CONTEXT.get().acviveSpan();
}
@Override
public void prepare() throws Throwable {
this.samplingService = ServiceManager.INSTANCE.getService(SamplingService.class);
}
}
TraceContextManager 维护了线程ThreadLocal ,代理了上下文的创建span的几个方法。
createEntrySpan 需要单独说明下,ContextCarrier 参数是跨进程或跨线程传递的上一级TraceSegment的信息,在之前实现TraceSegment的时候已经实现过。判断如果是携带ContextCarrier 参数的,代表前面的TraceSegment已经存在,则后面的TraceSegment必须要采样(链路不能断掉)。
context.extract方法就是将ContextCarrier的值设置到 TraceSegmentRef ,这个 TraceSegmentRef 就是指向上一级的TraceSegment信息,之前也提到过。
5.5 本章小结
本章是整个链路监控的数据结构核心,通过对链路的抽象将链路划分为具体的TraceSegment,Span等。他们的结构如图(图片摘抄于网络):
本章还介绍了EntrySpan的类似栈设计,通过stackDepth(当前栈深度)和maxStackDepth(最大栈深度)灵活的控制了怎么保证记录靠近调用内测的信息。
本章还介绍了链路的上下文AbstraceTraceContext,一个上下文对应一个TraceSegment, 通过这个上下文对所属的所有Span也进行了栈管理,这是一个真实先进后出的栈,值得注意的是入栈时,需要保证EntrySpan和ExitSpan的复用对象逻辑。出栈时,当栈为空,表示这个TraceSegment已经结束需要归档发送到kafka。
由于TraceSegment属于线程里面的操作,所以还创作了一个基于ThreadLocal的下文的管理器服务TraceContextManager,这个服务通过线程ThreadLocal来管理链路的上下文AbstraceTraceContext。
本章没有进行代码测试,因为可能一些细节后续还要修改,等后面介绍具体的插桩插件如何调用TraceContextManager的创建Span方法,传递怎样的参数,再进行修改完善代码和测试。