0x1 摘要
本文深入剖析Flink中闭包清除机制的两大核心问题:为何Flink需要执行闭包清除?以及Flink具体如何通过源码实现闭包清除?帮助开发者理解Flink序列化与匿名内部类的底层原理。

0x2 Flink 为什么要做闭包清除
众所周知,Flink的算子必须通过序列化机制分发到各个节点执行,因此算子对象必须实现序列化接口,这是不可动摇的规则。然而,许多开发者为了方便,习惯使用匿名内部类来定义算子——匿名内部类只要引用了外部对象,就会自动形成闭包。如果被引用的外部对象没有实现Serializable接口,整个算子序列化过程就会失败并抛出异常。简而言之,Flink框架必须在底层将这些闭包引用“清除”掉,才能确保任务正常分发与运行。这正是闭包清除存在的根本原因。
0x3 Flink 闭包清除实现
先来看一个最基础的Map算子代码示例:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final DataStreamSource source = env.addSource(new SourceFunction() {
@Override
public void run(SourceContext ctx) throws Exception {}
@Override
public void cancel() {}
});
source.map(new MapFunction() {
@Override
public String map(String value) throws Exception {
return null;
}
});
接着,我们追踪一下map方法的源码实现:
public SingleOutputStreamOperator map(MapFunction mapper) {
TypeInformation outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
Utils.getCallLocationName(), true);
return transform("Map", outType, new StreamMap<>(clean(mapper)));
}
注意这里调用了 clean(mapper)。继续深入,最终会进入 StreamExecutionEnvironment 类的如下方法:
@Internal
public F clean(F f) {
if (getConfig().isClosureCleanerEnabled()) {
ClosureCleaner.clean(f, true);
}
ClosureCleaner.ensureSerializable(f);
return f;
}
至此已经非常清晰,闭包清除的核心工具就是 ClosureCleaner。下面我们详细拆解这个类。
先看它的 clean 方法:
public static void clean(Object func, boolean checkSerializable) {
if (func == null) {
return;
}
final Class> cls = func.getClass();
// First find the field name of the "this$0" field, this can
// be "this$x" depending on the nesting
boolean closureAccessed = false;
for (Field f: cls.getDeclaredFields()) {
if (f.getName().startsWith("this$")) {
// found a closure referencing field - now try to clean
closureAccessed |= cleanThis0(func, cls, f.getName());
}
}
if (checkSerializable) {
try {
InstantiationUtil.serializeObject(func);
}
catch (Exception e) {
String functionType = getSuperClassOrInterfaceName(func.getClass());
String msg = functionType == null ?
(func + " is not serializable.") :
("The implementation of the " + functionType + " is not serializable.");
if (closureAccessed) {
msg += " The implementation accesses fields of its enclosing class, which is "
+ "a common reason for non-serializability. "
+ "A common solution is to make the function a proper (non-inner) class, or "
+ "a static inner class.";
} else {
msg += " The object probably contains or references non serializable fields.";
}
throw new InvalidProgramException(msg, e);
}
}
}
该方法包含两个参数:func 表示待清除的算子对象,checkSerializable 指示清除完成后是否调用序列化方法进行验证。
第一步,通过反射查找所有以 this$ 开头的成员变量——这些正是闭包引用的字段,对应代码如下:
for (Field f: cls.getDeclaredFields()) {
if (f.getName().startsWith("this$")) {
// found a closure referencing field - now try to clean
closureAccessed |= cleanThis0(func, cls, f.getName());
}
}
找到这些字段后,调用内部私有方法 cleanThis0 进行处理。下面来看它的源码:
private static boolean cleanThis0(Object func, Class> cls, String this0Name) {
This0AccessFinder this0Finder = new This0AccessFinder(this0Name);
getClassReader(cls).accept(this0Finder, 0);
final boolean accessesClosure = this0Finder.isThis0Accessed();
if (LOG.isDebugEnabled()) {
LOG.debug(this0Name + " is accessed: " + accessesClosure);
}
if (!accessesClosure) {
Field this0;
try {
this0 = func.getClass().getDeclaredField(this0Name);
} catch (NoSuchFieldException e) {
// has no this$0, just return
throw new RuntimeException("Could not set " + this0Name + ": " + e);
}
try {
this0.setAccessible(true);
this0.set(func, null);
}
catch (Exception e) {
// should not happen, since we use setAccessible
throw new RuntimeException("Could not set " + this0Name + " to null. " + e.getMessage(), e);
}
}
return accessesClosure;
}
核心仅有一行:this0.set(func, null); —— 直接将闭包引用置为null,从而解除对外部对象的持有。此外,该方法还利用了ASM字节码工具来检测闭包是否被实际访问,细节不再展开,感兴趣的读者可以自行查阅相关源码。
