ThreadLocal跨线程问题

Jill ·
更新时间:2024-11-13
· 635 次阅读

1、问题

通常复杂的处理流程中,我们会使用一些异步处理的手段,那么这种场景下ThreadLocal即可能出现获取失败的问题。

public class ThreadLocalTest { public static void main(String[] args) { ThreadLocal threadLocal = new ThreadLocal(); threadLocal.set("A"); System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get()); new Thread(()->{ System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get()); }).start(); } }

在这里插入图片描述

2、InheritableThreadLocal

直接使用ThreadLocal,在跨线程时时无法获取到ThreadLocal的。
在lang包还有一个继承自ThreadLocal的类InheritableThreadLocal

public class InheritableThreadLocal extends ThreadLocal { /** * Computes the child's initial value for this inheritable thread-local * variable as a function of the parent's value at the time the child * thread is created. This method is called from within the parent * thread before the child is started. *

* This method merely returns its input argument, and should be overridden * if a different behavior is desired. * * @param parentValue the parent thread's value * @return the child thread's initial value */ protected T childValue(T parentValue) { return parentValue; } /** * Get the map associated with a ThreadLocal. * * @param t the current thread */ ThreadLocalMap getMap(Thread t) { return t.inheritableThreadLocals; } /** * Create the map associated with a ThreadLocal. * * @param t the current thread * @param firstValue value for the initial entry of the table. */ void createMap(Thread t, T firstValue) { t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue); } }

重写了getMap和createMap方法,让每次get、set操作都是对inheritableThreadLocals进行操作。
在Thread的init方法中有个判断,若是父线程的inheritableThreadLocals不为空,则将其复制到子线程。
在这里插入图片描述
也就是说我们使用InheritableThreadLocal,只要新建线程就可以让ThreadLocal在子父线程之间传递。

public class ThreadLocalTest { public static void main(String[] args) { ThreadLocal threadLocal = new InheritableThreadLocal(); threadLocal.set("A"); System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get()); new Thread(()->{ System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get()); new Thread(()->{ System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get()); }).start(); }).start(); } }

在这里插入图片描述

3、线程池复用Thread导致的问题

并不代表InheritableThreadLocal就可以保证子父线程之间正确传递ThreadLocal对象。inheritableThreadLocals的复制操作只有在新创建Thread对象的时候才会触发。而我们通常不会在项目中new Thread,而是使用线程池,线程池的Thread对象是复用的。
将上面的例子改成使用线程池
使用固定大小的线程池,当固定大小为2时没问题,两次使用的不是同一个Thread,每次都成功复制了正确的ThreadLocal。

public class ThreadLocalTest1 { public static void main(String[] args) throws InterruptedException { Executor threadPool = Executors.newFixedThreadPool(2); ThreadLocal threadLocal = new InheritableThreadLocal(); threadLocal.set("A"); System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get()); threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get()); } }); threadLocal.set("B"); Thread.sleep(200); threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get()); } }); } }

在这里插入图片描述
而将线程池固定大小设为1时会发现,使用同一个Thread对象,第二次并不会触发重新复制ThreadLocal对象,还是以前的A。

public class ThreadLocalTest1 { public static void main(String[] args) throws InterruptedException { Executor threadPool = Executors.newFixedThreadPool(1); ThreadLocal threadLocal = new InheritableThreadLocal(); threadLocal.set("A"); System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get()); threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get()); } }); threadLocal.set("B"); Thread.sleep(200); threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get()); } }); } }

在这里插入图片描述

4、解决办法

阿里的TransmittableThreadLocal
引入transmittable-thread-local依赖

com.alibaba transmittable-thread-local 2.11.4

使用TransmittableThreadLocal继续改造上面的例子

public class ThreadLocalTest2 { public static void main(String[] args) throws InterruptedException { Executor threadPool = TtlExecutors.getTtlExecutor(Executors.newFixedThreadPool(1)); ThreadLocal threadLocal = new TransmittableThreadLocal(); threadLocal.set("A"); System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get()); threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get()); } }); threadLocal.set("B"); Thread.sleep(200); threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"-getThreadLocal:"+threadLocal.get()); } }); } }

在这里插入图片描述
两个关键点:
(1)需要使用transmittable-thread-local包中的方法包装线程池,否则即使使用TransmittableThreadLocal也不会起作用
(2)使用TransmittableThreadLocal对象来保存线程变量。
简单分析一下这个东东:
TransmittableThreadLocal继承自InheritableThreadLocal
在这里插入图片描述
TransmittableThreadLocal中有个holder的静态对象,对ThreadLocal进行get、set、remove等操作时其实是在对这个holder进行操作。用到了WeakHashMap,这个其实和JDK1.8以前的老版hashmap很相似,不同的是Entry对象是使用WeakReference包装的,这应该个原生ThreadLocal中的ThreadLocalMap使用WeakReference是一样的道理。(ThreadLocal为什么使用WeakReference)

private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal, ?>> holder = new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal, ?>>() { protected WeakHashMap<TransmittableThreadLocal, ?> initialValue() { return new WeakHashMap(); } protected WeakHashMap<TransmittableThreadLocal, ?> childValue(WeakHashMap<TransmittableThreadLocal, ?> parentValue) { return new WeakHashMap(parentValue); } };

然后看包装线程池是在干什么

@Nullable public static Executor getTtlExecutor(@Nullable Executor executor) { return (Executor)(!TtlAgent.isTtlAgentLoaded() && null != executor && !(executor instanceof TtlEnhanced)?new ExecutorTtlWrapper(executor):executor); }

返回了一个ExecutorTtlWrapper对象,在调用execute时将Runnable对象换成了TtlRunnable。

class ExecutorTtlWrapper implements Executor, TtlWrapper, TtlEnhanced { private final Executor executor; ExecutorTtlWrapper(@NonNull Executor executor) { this.executor = executor; } public void execute(@NonNull Runnable command) { this.executor.execute(TtlRunnable.get(command)); } @NonNull public Executor unwrap() { return this.executor; } }

TtlRunnable中改造了原生Runnable的run方法

public final class TtlRunnable implements Runnable, TtlWrapper, TtlEnhanced, TtlAttachments { private final AtomicReference capturedRef = new AtomicReference(Transmitter.capture()); private final Runnable runnable; private final boolean releaseTtlValueReferenceAfterRun; private final TtlAttachmentsDelegate ttlAttachment = new TtlAttachmentsDelegate(); private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) { this.runnable = runnable; this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun; } public void run() { Object captured = this.capturedRef.get(); if(captured != null && (!this.releaseTtlValueReferenceAfterRun || this.capturedRef.compareAndSet(captured, (Object)null))) { Object backup = Transmitter.replay(captured); try { this.runnable.run(); } finally { Transmitter.restore(backup); } } else { throw new IllegalStateException("TTL value reference is released after run!"); } } ...... }

Transmitter.capture()其实就是在获取TransmittableThreadLocal中holder的副本,对该副本使用AtomicReference进行包装,方便保证原子性。
调用run时,从capturedRef获取副本,使用cas将其更新为null,若cas失败则抛出异常IllegalStateException(“TTL value reference is released after run!”),使用cas来保证每次取到的都是最新的副本。
因此包装线程池的作用就是将以前每次新建Thread对象才拷贝inheritableThreadLocals的机制变成了每次新建Runnable的时候拷贝副本,从而保证线程池中子父线程之间ThreadLocal对象的传递。

这种在子父线程之间传递上下文的操作其实在很多框架中都有,如sleuth:
以前一直很好奇,为什么异步调用时调用链路依然是完整的,很神奇。sleuth中使用一个叫ExecutorBeanPostProcessor的后置处理器包装了所有的线程池,例如将ThreadPoolTaskExecutor包装为LazyTraceThreadPoolTaskExecutor。LazyTraceThreadPoolTaskExecutor重写了线程池的execute、submit等方法,当我们传入参数Runnable或者Callable时会被包装为TraceRunnable或TraceCallable。


作者:小方好方



threadlocal 线程

需要 登录 后方可回复, 如果你还没有账号请 注册新账号