ThreadLocal系列源码分析

ThreadLocal系列源码分析

ThreadLocal 源码分析:

直接看 ThreadLocal##get() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class ThreadLocal {
public T get() {
// 1. 获取当前线程
Thread t = Thread.currentThread();
// 2. 获取线程对象中的属性 threadLocals表
ThreadLocalMap map = getMap(t);
if (map != null) {
// 3. 获取Entry
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
// 4. 返回value值,即 ThreadLocal 存储的线程相关的值
T result = (T) e.value;
return result;
}
}
// 5. 如果 map 为空,则设置并返回初始值
return setInitialValue();
}

/**
* 每个线程都保存自己的一份 ThreadLocal 表
*/
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

/**
* 设置并返回初始值
*/
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}

/**
* 设置初始值
*/
protected T initialValue() {
return null;
}
}

再看 ThreadLocal##set()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class ThreadLocal{
public void set(T value) {
// 1. 获取当前线程
Thread t = Thread.currentThread();
// 2. 获取线程对象中的属性 ThreadLocalMap
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
}

每个线程都保存自己的一份 ThreadLocal 表,这个表长啥样?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Thread {
ThreadLocal.ThreadLocalMap threadLocals = null;
}

class ThreadLocal {
static class ThreadLocalMap {
// 定制了自己的 hash 算法,每个Entry都可定位到指定的索引
private Entry[] table;

static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
}
}

WeakReference<ThreadLocal<?>> 表示只有一个 ThreadLocal 引用,但是这个引用比较特殊

ThreadLocal 为什么要使用 WeakReference?

当发生 GC 时,WeakReference 中持有的引用便会被回收,这样的意义是:当 ThreadLocal 置为 Null 时,Thread 中的 map 自动进行回收(只有 map 中的 key 会自动回收)。

WeakReference 应用场景?

  1. map 自动回收
  2. 缓存自动释放

WeakReference 内的引用如果被回收了,自身如何存在?

自身不会被回收,但是会被放到 ReferenceQuene 中(如果有初始化),发生回收后,可以遍历 ReferenceQuene 来进行后续操作。

feign 中使用 RequestInterceptor 遇到的一个问题:

当 Hystrix资源隔离策略 为线程模式时,RequestInterceptor 会变成异步执行,这时候 RequestContextHolder.getRequestAttributes() 拿到的结果会为 Null ,这是因为 RequestContextHolder 基于 ThreadLocal。

如何解决上述问题?

使用 InheritableThreadLocal ,该类可以实现父线程的 ThreadLocalMap 传递到子线程的 ThreadLocalMap中。

InheritableThreadLocal 源码分析

ThreadLocalMap 的传递发生在新线程创建时,那么可以从 Thread 的初始化方法入手:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Thread {

// 除了有 ThreadLocal 表 还有 inheritableThreadLocals 表
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

private void init() {
// ...忽略无关代码
if (inheritThreadLocals && parent.inheritableThreadLocals != null){
// 这里把父线程中的 inheritableThreadLocals表 传递到子线程中的 inheritableThreadLocals表
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
}
}
}

再看 ThreadLocal##createInheritedMap()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class ThreadLocal {
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
return new ThreadLocalMap(parentMap);
}
static class ThreadLocalMap {
private ThreadLocalMap(ThreadLocalMap parentMap) {
Entry[] parentTable = parentMap.table;
int len = parentTable.length;
setThreshold(len);
table = new Entry[len];

for (int j = 0; j < len; j++) {
Entry e = parentTable[j];
if (e != null) {
@SuppressWarnings("unchecked")
ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
if (key != null) {
// 核心方法
Object value = key.childValue(e.value);
Entry c = new Entry(key, value);
int h = key.threadLocalHashCode & (len - 1);
while (table[h] != null)
h = nextIndex(h, len);
table[h] = c;
size++;
}
}
}
}
}

T childValue(T parentValue) {
throw new UnsupportedOperationException();
}
}

ThreadLocal 中 不支持 传递方法 childValue(),该方法由子类 InheritableThreadLocal 重写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
/**
* 重写 childValue(),支持传递
*/
protected T childValue(T parentValue) {
return parentValue;
}

/**
* 变更为操作 inheritableThreadLocals表
*/
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}

/**
* 变更为操作 inheritableThreadLocals表
*/
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}

InheritableThreadLocal 有什么不足?

InheritableThreadLocal 只能做到父线程到子线程的传递,但目前往往会使用线程池,这时候 InheritableThreadLocal 就失效了。

可以使用 TransmittableThreadLocal 解决上述问题,TransmittableThreadLocal 的任务是把 任务提交给线程池时ThreadLocal值传递到 任务执行时

使用案例:

1
2
3
4
5
6
7
8
9
10
11
12
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<String>();
context.set("value-set-in-parent");

Runnable task = new Runnable();
// 额外的处理,生成修饰了的对象ttlRunnable
Runnable ttlRunnable = TtlRunnable.get(task);
executorService.submit(ttlRunnable);

// =====================================================

// Task中可以读取,值是"value-set-in-parent"
String value = context.get();

那么来看看 TtlRunnable##get 中干了些啥

TransmittableThreadLocal 源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class TtlRunnable implements Runnable, TtlEnhanced {
private final AtomicReference<Object> capturedRef;
private final Runnable runnable;
private final boolean releaseTtlValueReferenceAfterRun;

/**
* 核心
*/
private TtlRunnable(@Nonnull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
// 保存要传递的 ThreadLocalMap capture()
this.capturedRef = new AtomicReference<Object>(capture());
// 保存原任务
this.runnable = runnable;
// 执行完后是否释放
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}

public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
if (null == runnable) return null;

// 若发现已经是目标类型了(说明已经被包装过了)直接返回
if (runnable instanceof TtlEnhanced) {
if (idempotent) return (TtlRunnable) runnable;
else throw new IllegalStateException("Already TtlRunnable!");
}
// 其实就是包装了一下
return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
}
}

包装时核心的方法来自 TransmittableThreadLocal 中的 静态内部类 Transmitter##capture()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> {
public static class Transmitter {
public static Object capture() {
/*
* 该方法做主要功能只是把 holder 的值做一个拷贝
*/
Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {
captured.put(threadLocal, threadLocal.copyValue());
}
return captured;
}
}
}

holder 是啥?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// TransmittableThreadLocal 继承自 InheritableThreadLocal
public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> {
// holder 是一个 InheritableThreadLocal,存储 线程相关 的一个 Map
// 该 map 即从 **任务提交给线程池时** 传递到 **任务执行时** 的 `ThreadLocal` 值
// 采用 WeakHashMap
private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
@Override
protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
}

@Override
protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
}
};
}

holder 中数据的相关操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> {
/**
* 重写 get()
*/
@Override
public final T get() {
T value = super.get();
if (null != value) addValue();
return value;
}

/**
* 重写 set()
*/
@Override
public final void set(T value) {
super.set(value);
// may set null to remove value
if (null == value) removeValue();
else addValue();
}

/**
* 重写 remove()
*/
@Override
public final void remove() {
removeValue();
super.remove();
}

/**
* holder 写
*/
private void addValue() {
if (!holder.get().containsKey(this)) {
holder.get().put(this, null); // WeakHashMap supports null value.
}
}

/**
* holder 删
*/
private void removeValue() {
holder.get().remove(this);
}
}
  1. 在线程A中写 TransmittableThreadLocal 时,便会将值放到 holder 的 map 中,holder 是一个ThreadLocal,也就是线程A持有的
  2. 在线程A中创建一个任务时,将 holder 保存的值保存到任务中,随任务一起传递到线程B中
  3. 在线程B中执行 TtlRunnable##run()

可以看到 TtlRunnable 是把原任务做了一个包装,那么本身也是作为 Runnable,那么最关键的就是 run() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class TtlRunnable implements Runnable, TtlEnhanced {
@Override
public void run() {
// 1. 获取传递来的 ThreadLocal Map
Object captured = capturedRef.get();
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}

// 核心方法
// 执行前,使用传递来的 ThreadLocal Map,覆盖本地的 ThreadLocalMap
Object backup = replay(captured);
try {
// 执行实际的任务
runnable.run();
} finally {
// 回复本地的 ThreadLocalMap
restore(backup);
}
}
}

核心方法 Transmitter##replay():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> {
public static class Transmitter {
@Nonnull
public static Object replay(@Nonnull Object captured) {
// 入参
Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
// 出参
Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();

// 1. 迭代本线程中的 holder
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();

// 2. 存储本线程中的 ThreadLocalMap 到 backup
backup.put(threadLocal, threadLocal.get());

// 3. 清除本线程中的 ThreadLocalMap
if (!capturedMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}

// 4. 执行 ThreadLocal##set()
// 将传递的 ThreadLocalMap 设置到本线程的 ThreadLocalMap 中
setTtlValuesTo(capturedMap);

doExecuteCallback(true);

return backup;
}

private static void setTtlValuesTo(@Nonnull Map<TransmittableThreadLocal<?>, Object> ttlValues) {
for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) {
@SuppressWarnings("unchecked")
TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
threadLocal.set(entry.getValue());
}
}
}

private static void doExecuteCallback(boolean isBefore) {
for (Map.Entry<TransmittableThreadLocal<?>, ?> entry : holder.get().entrySet()) {
TransmittableThreadLocal<?> threadLocal = entry.getKey();

try {
if (isBefore) threadLocal.beforeExecute();
else threadLocal.afterExecute();
} catch (Throwable t) {
if (logger.isLoggable(Level.WARNING)) {
logger.log(Level.WARNING, "TTL exception when " + (isBefore ? "beforeExecute" : "afterExecute") + ", cause: " + t.toString(), t);
}
}
}
}
}

TransmittableThreadLocal 流程总结:

  1. 在线程A中写 TransmittableThreadLocal 时,便会将值放到 holder 的 map 中,holder 是一个ThreadLocal,也就是线程A持有的
  2. 在线程A中创建一个任务时,使用TtlRunnable做一层包装,将 holder 保存的值保存到任务中,随任务一起传递到线程B中
  3. 在线程B中执行 TtlRunnable##run()
  4. 执行前,使用传递来的 ThreadLocalMap,覆盖本地的 ThreadLocalMap
  5. 执行Runnable##run()
  6. 执行后,恢复本地的 ThreadLocalMap

流程图:

img

总结

  1. ThreadLocal 根本问题是由于这是一个线程无关的变量,而传递的本质是使用一个线程共享的变量进行传递值,因为线程共享,所以需要注意 线程安全问题
  2. TransmittableThreadLocal 继承了 InheritableThreadLocal,因此在第一次被创建时,还是使用的 InheritableThreadLocal 的功能,而后在线程池中,从父线程中继承的 ThreadLocalMap,便是作为本地元素 ThreadLocalMap 被备份和恢复。

本文标题:ThreadLocal系列源码分析

文章作者:Sun

发布时间:2020年06月23日 - 15:06

最后更新:2020年06月23日 - 16:06

原始链接:https://sunyi720.github.io/2020/06/23/Java/ThreadLocal系列源码分析/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。