分布式系統(tǒng)中日志追蹤需要考慮的幾個(gè)點(diǎn)?
- 需要一個(gè)全服務(wù)唯一的id,即traceId,如何保證?
- traceId如何在服務(wù)間傳遞?
- traceId如何在服務(wù)內(nèi)部傳遞?
- traceId如何在多線程中傳遞?
我們一一來(lái)解答:
- 全服務(wù)唯一的traceId,可以使用uuid生成,正常來(lái)說(shuō)不會(huì)出現(xiàn)重復(fù)的;
- 關(guān)于服務(wù)間傳遞,對(duì)于調(diào)用者,在協(xié)議頭加上traceId,對(duì)于被調(diào)用者,通過(guò)前置攔截器或者過(guò)濾器統(tǒng)一攔截;
- 關(guān)于服務(wù)內(nèi)部傳遞,可以使用ThreadLocal傳遞traceId,一處放置,隨處可用;
- 關(guān)于多線程傳遞,分為兩種情況:子線程,可以使用InheritableThreadLocal線程池,需要改造線程池對(duì)提交的任務(wù)進(jìn)行包裝,把提交者的traceId包裝到任務(wù)中

比如,上面這個(gè)系統(tǒng),系統(tǒng)入口在A處,A調(diào)用B的服務(wù),B里面又起了一個(gè)線程B1去訪問(wèn)D的服務(wù),B本身又去訪問(wèn)C服務(wù)。
我們就可以這么來(lái)跟蹤日志:
- 所有服務(wù)都需要一個(gè)全局的InheritableThreadLocal保存服務(wù)內(nèi)部traceId的傳遞;
- 所有服務(wù)都需要一個(gè)前置攔截器或者過(guò)濾器,檢測(cè)如果請(qǐng)求頭沒(méi)有traceId就生成一個(gè),如果有就取出來(lái),并把traceId放到全局的InheritableThreadLocal里面;
- 一個(gè)服務(wù)調(diào)用另一個(gè)服務(wù)的時(shí)候把traceId塞到請(qǐng)求頭里,比如http header,本文來(lái)源于工從號(hào)彤哥讀源碼;
- 改造線程池,在提交的時(shí)候包裝任務(wù),這個(gè)工作量比較大,因?yàn)榉?wù)內(nèi)部可能依賴(lài)其它框架,這些框架的線程池有可能也需要修改;
實(shí)現(xiàn)
我們模擬A到B這兩個(gè)服務(wù)來(lái)實(shí)現(xiàn)一個(gè)日志跟蹤系統(tǒng)。
為了簡(jiǎn)單起見(jiàn),我們使用SpringBoot,它默認(rèn)使用的日志框架是logback,而且Slf4j提供了一個(gè)包裝了InheritableThreadLocal的類(lèi)叫MDC,我們只要把traceId放在MDC中,打印日志的時(shí)候統(tǒng)一打印就可以了,不用顯式地打印traceId。
我們分成三個(gè)模塊:
- 公共包:封裝攔截器,traceId的生成,服務(wù)內(nèi)傳遞,請(qǐng)求頭的傳遞等;
- A服務(wù):只依賴(lài)于公共包,并提供一個(gè)接口接收外部請(qǐng)求;
- B服務(wù):依賴(lài)于公共包,并內(nèi)部起一個(gè)線程池,用于發(fā)送B1->D的請(qǐng)求,當(dāng)然我們這里不發(fā)送請(qǐng)求,只在線程池中簡(jiǎn)單地打印一條日志;
公共包
- TraceFilter.JAVA
前置過(guò)濾器,用攔截器實(shí)現(xiàn)也是一樣的。
從請(qǐng)求頭中獲取traceId,如果不存在就生成一個(gè),并放入MDC中。
@Slf4j
@WebFilter("/**")
@Component
public class TraceFilter implements Filter {
@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain chain) throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest) servletRequest;
// 從請(qǐng)求頭中獲取traceId
String traceId = request.getHeader("traceId");
// 不存在就生成一個(gè)
if (traceId == null || "".equals(traceId)) {
traceId = UUID.randomUUID().toString();
}
// 放入MDC中,本文來(lái)源于工從號(hào)彤哥讀源碼
MDC.put("traceId", traceId);
chain.doFilter(servletRequest, servletResponse);
}
@Override
public void destroy() {
}
}
- TraceThreadPoolExecutor.java
改造線程池,提交任務(wù)的時(shí)候進(jìn)行包裝。
public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
public void execute(Runnable command) {
// 提交者的本地變量
Map<string, string> contextMap = MDC.getCopyOfContextMap();
super.execute(()->{
if (contextMap != null) {
// 如果提交者有本地變量,任務(wù)執(zhí)行之前放入當(dāng)前任務(wù)所在的線程的本地變量中
MDC.setContextMap(contextMap);
}
try {
command.run();
} finally {
// 任務(wù)執(zhí)行完,清除本地變量,以防對(duì)后續(xù)任務(wù)有影響
MDC.clear();
}
});
}
}
- TraceAsyncConfigurer.java
改造Spring的異步線程池,包裝提交的任務(wù)。
@Slf4j
@Component
public class TraceAsyncConfigurer implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("async-pool-");
executor.setTaskDecorator(new MdcTaskDecorator());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (throwable, method, params) -> log.error("asyc execute error, method={}, params={}", method.getName(), Arrays.toString(params));
}
public static class MdcTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
Map<string, string> contextMap = MDC.getCopyOfContextMap();
return () -> {
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
try {
runnable.run();
} finally {
MDC.clear();
}
};
}
}
}
- HttpUtils.java
封裝Http工具類(lèi),把traceId加入頭中,帶到下一個(gè)服務(wù)。
@Slf4j
public class HttpUtils {
public static String get(String url) throws URISyntaxException {
RestTemplate restTemplate = new RestTemplate();
MultiValueMap<string, string> headers = new HttpHeaders();
headers.add("traceId", MDC.get("traceId"));
URI uri = new URI(url);
RequestEntity<!--?--> requestEntity = new RequestEntity<>(headers, HttpMethod.GET, uri);
ResponseEntity<string> exchange = restTemplate.exchange(requestEntity, String.class);
if (exchange.getStatusCode().equals(HttpStatus.OK)) {
log.info("send http request success");
}
return exchange.getBody();
}
}
A服務(wù)
A服務(wù)通過(guò)Http調(diào)用B服務(wù)。
@Slf4j
@RestController
public class AController {
@RequestMApping("a")
public String a(String name) {
log.info("Hello, " + name);
try {
// A中調(diào)用B
return HttpUtils.get("http://localhost:8002/b");
} catch (Exception e) {
log.error("call b error", e);
}
return "fail";
}
}
A服務(wù)的日志輸出格式:
中間加了[%X{traceId}]一串表示輸出traceId。
# 本文來(lái)源于工從號(hào)彤哥讀源碼
logging:
pattern:
console: '%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(%5p) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr([%X{traceId}]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n%wEx'
B服務(wù)
B服務(wù)內(nèi)部有兩種跨線程調(diào)用:
- 利用Spring的異步線程池
- 使用自己的線程池
BController.java
@Slf4j
@RestController
public class BController {
@Autowired
private BService bService;
@RequestMapping("b")
public String b() {
log.info("Hello, b receive request from a");
bService.sendMsgBySpring();
bService.sendMsgByThreadPool();
return "ok";
}
}
BService.java
@Slf4j
@Service
public class BService {
public static final TraceThreadPoolExecutor threadPool = new TraceThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
@Async
public void sendMsgBySpring() {
log.info("send msg by spring success");
}
public void sendMsgByThreadPool() {
threadPool.execute(()->log.info("send msg by thread pool success"));
}
}
B服務(wù)的日志輸出格式:
中間加了[%X{traceId}]一串表示輸出traceId。
logging:
pattern:
console: '%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(%5p) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr([%X{traceId}]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n%wEx'
測(cè)試
打開(kāi)瀏覽器,輸入http://localhost:8001/a?name=andy。
A服務(wù)輸出日志:
2019-12-26 21:36:29.132 INFO 5132 --- [nio-8001-exec-2] [8a59cb96-bbc8-42a9-aa62-df7a52875447] com.alan.trace.a.AController : Hello, andy
2019-12-26 21:36:35.380 INFO 5132 --- [nio-8001-exec-2] [8a59cb96-bbc8-42a9-aa62-df7a52875447] com.alan.trace.common.HttpUtils : send http request success
B服務(wù)輸出日志:
2019-12-26 21:36:29.244 INFO 2368 --- [nio-8002-exec-1] [8a59cb96-bbc8-42a9-aa62-df7a52875447] com.alan.trace.b.BController : Hello, b receive request from a
2019-12-26 21:36:29.247 INFO 2368 --- [nio-8002-exec-1] [8a59cb96-bbc8-42a9-aa62-df7a52875447] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService
2019-12-26 21:36:35.279 INFO 2368 --- [ async-pool-1] [8a59cb96-bbc8-42a9-aa62-df7a52875447] com.alan.trace.b.BService : send msg by spring success
2019-12-26 21:36:35.283 INFO 2368 --- [pool-1-thread-1] [8a59cb96-bbc8-42a9-aa62-df7a52875447] com.alan.trace.b.BService : send msg by thread pool success
可以看到,A服務(wù)成功生成了traceId,并且傳遞給了B服務(wù),且B服務(wù)線程間可以保證同一個(gè)請(qǐng)求的traceId是可以傳遞的。
文章來(lái)源:https://my.oschina.net/u/4108008/blog/3152201
關(guān)注我了解更多程序員資訊技術(shù),領(lǐng)取豐富架構(gòu)資料