spring cloud sluth 实现原理

spring cloud sluth 实现原理spring cloud sluth 实现原理 在讨论这个问题之前 我们思考这样几个问题 client 是如何生成 span 信息 client 调用 server 服务是如何 传递的 span 的 resttemplate feign webclint server 是如何接收到 client 端传过来的 span client server 是如何 同步给

大家好,我是讯享网,很高兴认识大家。

spring cloud sluth 实现原理

在讨论这个问题之前,我们思考这样几个问题

  • client 是如何生成 span 信息
  • client 调用 server 服务是如何 传递的 span 的
    • resttemplate
    • feign
    • webclint
  • server 是如何接收到 client 端传过来的 span
  • client , server 是如何 同步给 zipkin
  • span 是如何 是线程池,线程之间进行传递的

带着这些问题,我们一一探索

client 是如何生成 span 信息 和 server 是如何接收到 client 端传过来的 span

主要靠 LazyTracingFilter#TracingFilter:

// TracingFilter @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { 
    HttpServletRequest req = (HttpServletRequest) request; HttpServletResponse res = servlet.httpServletResponse(response); // Prevent duplicate spans for the same request TraceContext context = (TraceContext) request.getAttribute(TraceContext.class.getName()); if (context != null) { 
    // A forwarded request might end up on another thread, so make sure it is // scoped CurrentTraceContext.Scope scope = currentTraceContext.maybeScope(context); try { 
    chain.doFilter(request, response); } finally { 
    scope.close(); } return; } Span span = handler.handleReceive(new HttpServletRequestWrapper(req)); // Add attributes for explicit access to customization or span context request.setAttribute(SpanCustomizer.class.getName(), span); request.setAttribute(TraceContext.class.getName(), span.context()); SendHandled sendHandled = new SendHandled(); request.setAttribute(SendHandled.class.getName(), sendHandled); Throwable error = null; CurrentTraceContext.Scope scope = currentTraceContext.newScope(span.context()); try { 
    // any downstream code can see Tracer.currentSpan() or use // Tracer.currentSpanCustomizer() chain.doFilter(req, res); } catch (Throwable e) { 
    error = e; throw e; } finally { 
    // When async, even if we caught an exception, we don't have the final // response: defer if (servlet.isAsync(req)) { 
    servlet.handleAsync(handler, req, res, span); } else if (sendHandled.compareAndSet(false, true)) { 
    // we have a synchronous response or error: finish the span HttpServerResponse responseWrapper = HttpServletResponseWrapper.create(req, res, error); handler.handleSend(responseWrapper, span); } scope.close(); } } 

讯享网
  • 如果 请求头带了 X-B3-TraceId,X-B3-SpanId,X-B3-ParentSpanId属性, 则往下传递
  • 如果 没有,则生成新的 span

client 调用 server 服务是如何 传递的 span 的

RestTemplate 作为客户端

很容易想到是通过增加拦截器 ClientHttpRequestInterceptor , spring 也是这样想的 主要增加了 TracingClientHttpRequestInterceptor:

讯享网@Override public ClientHttpResponse intercept(HttpRequest req, byte[] body, ClientHttpRequestExecution execution) throws IOException { 
    HttpRequestWrapper request = new HttpRequestWrapper(req); Span span = handler.handleSend(request); if (log.isDebugEnabled()) { 
    log.debug("Wrapping an outbound http call with span [" + span + "]"); } ClientHttpResponse response = null; Throwable error = null; try (CurrentTraceContext.Scope ws = currentTraceContext.newScope(span.context())) { 
    response = execution.execute(req, body); return response; } catch (Throwable e) { 
    error = e; throw e; } finally { 
    handler.handleReceive(new ClientHttpResponseWrapper(request, response, error), span); } } 

可以发现,将 request 进行了包装,将 span 相关的信息都加入到了请求头,到了服务器端就会解析出,并继续向下传递


讯享网

FeignClient 作为客户端

todo…

WebClient 作为客户端

todo…

span 是如何 是线程池,线程之间进行传递的

  • spring 在容器启动的时候 增加了 ExecutorBeanPostProcessor 后置处理器
  • ExecutorBeanPostProcessor 对各种线程池进行了包装
//ExecutorBeanPostProcessor public class ExecutorBeanPostProcessor implements BeanPostProcessor { 
    private static final Log log = LogFactory.getLog(ExecutorBeanPostProcessor.class); private final BeanFactory beanFactory; private SleuthAsyncProperties sleuthAsyncProperties; public ExecutorBeanPostProcessor(BeanFactory beanFactory) { 
    this.beanFactory = beanFactory; } @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { 
    return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { 
    if (!ExecutorInstrumentor.isApplicableForInstrumentation(bean)) { 
    return bean; } return new ExecutorInstrumentor(() -> sleuthAsyncProperties().getIgnoredBeans(), this.beanFactory) .instrument(bean, beanName); } private SleuthAsyncProperties sleuthAsyncProperties() { 
    if (this.sleuthAsyncProperties == null) { 
    this.sleuthAsyncProperties = this.beanFactory.getBean(SleuthAsyncProperties.class); } return this.sleuthAsyncProperties; } } 

再看 ExecutorInstrumentor.instrument 方法:

讯享网 public Object instrument(Object bean, String beanName) { 
    if (!isApplicableForInstrumentation(bean)) { 
    log.info("Bean is already instrumented or is not applicable for instrumentation " + beanName); return bean; } if (bean instanceof ThreadPoolTaskExecutor) { 
    if (isProxyNeeded(beanName)) { 
    return wrapThreadPoolTaskExecutor(bean, beanName); } else { 
    log.info("Not instrumenting bean " + beanName); } } else if (bean instanceof ScheduledExecutorService) { 
    if (isProxyNeeded(beanName)) { 
    return wrapScheduledExecutorService(bean, beanName); } else { 
    log.info("Not instrumenting bean " + beanName); } } else if (bean instanceof ExecutorService) { 
    if (isProxyNeeded(beanName)) { 
    return wrapExecutorService(bean, beanName); } else { 
    log.info("Not instrumenting bean " + beanName); } } else if (bean instanceof AsyncTaskExecutor) { 
    if (isProxyNeeded(beanName)) { 
    return wrapAsyncTaskExecutor(bean, beanName); } else { 
    log.info("Not instrumenting bean " + beanName); } } else if (bean instanceof Executor) { 
    return wrapExecutor(bean, beanName); } return bean; } Object createThreadPoolTaskExecutorProxy(Object bean, boolean cglibProxy, ThreadPoolTaskExecutor executor, String beanName) { 
    if (!cglibProxy) { 
    return LazyTraceThreadPoolTaskExecutor.wrap(this.beanFactory, executor, beanName); } return getProxiedObject(bean, beanName, true, executor, () -> LazyTraceThreadPoolTaskExecutor.wrap(this.beanFactory, executor, beanName)); } //LazyTraceThreadPoolTaskExecutor.java private Runnable wrap(Runnable runnable) { 
    if (runnable instanceof TraceRunnable) { 
    return runnable; } return ContextUtil.isContextUnusable(this.beanFactory) ? runnable : new TraceRunnable(tracer(), spanNamer(), runnable, this.beanName); } private <V> Callable<V> wrap(Callable<V> callable) { 
    if (callable instanceof TraceCallable) { 
    return callable; } return ContextUtil.isContextUnusable(this.beanFactory) ? callable : new TraceCallable<>(tracer(), spanNamer(), callable, this.beanName); } 

可以看出,最终将不同类型的 线程池类型包装成了 LazyTraceThreadPoolTaskExecutor, 在 LazyTraceThreadPoolTaskExecutor 提交任务时 传递了 span信息

线程包装:
public class TraceRunnable implements Runnable { 
    / * Since we don't know the exact operation name we provide a default name for the * Span. */ private static final String DEFAULT_SPAN_NAME = "async"; private final Tracer tracer; private final Runnable delegate; private final Span parent; private final String spanName; public TraceRunnable(Tracer tracer, SpanNamer spanNamer, Runnable delegate) { 
    this(tracer, spanNamer, delegate, null); } public TraceRunnable(Tracer tracer, SpanNamer spanNamer, Runnable delegate, String name) { 
    this.tracer = tracer; this.delegate = delegate; this.parent = tracer.currentSpan(); this.spanName = name != null ? name : spanNamer.name(delegate, DEFAULT_SPAN_NAME); } @Override public void run() { 
    Span childSpan = SleuthAsyncSpan.ASYNC_RUNNABLE_SPAN.wrap(this.tracer.nextSpan(this.parent)) .name(this.spanName); try (Tracer.SpanInScope ws = this.tracer.withSpan(childSpan.start())) { 
    this.delegate.run(); } catch (Exception | Error e) { 
    childSpan.error(e); throw e; } finally { 
    childSpan.end(); } } } 

在线程中也传递了 span 信息

小讯
上一篇 2025-03-20 10:48
下一篇 2025-02-11 10:25

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/12199.html