Spark cartesian 优化

背景

目前我们的使用场景需要进行离线预测,这时候就需要拼接离线预测数据,很明显需要对 user 数据集和 cate 数据集进行笛卡尔积,但是 user 集合是全量数据,数据量大约为 1.8 亿,spark 中提供了 cartesian 操作,但是 cartesian 操作会迅速消耗大量的内存,并且数据集比较大的时候耗时是指数级增长,其实大部分时候耗时长就算了,=_=我们的数据量级是完全没办法跑出来,单个 Executor 会直接 OOM。这种情况下,我们就不得不去优化,跑不出来 job 是多么的令人绝望。

思考

思考优化的时候我第一反应是使用广播,因为我们的数据集合比较特别,cate 数据集数据量较小,但是 user 数据量非常大,之前也遇到过 Executor OOM 的场景,当时也是使用广播来解决了这个问题,但是之前的场景是使用广播来代替 join 操作,也就是大表数据一对一输出,但是我们现在的场景很明显不符合,是需要大表的数据一对多输出,这时候我想到了 flatMap,没错 flatMap 就是一个数据集一条数据可输出多条数据,这时候就 so easy 啦,只需要遍历大表,将大表中的一条数据与小表中的所有数据拼接成一个 string,然后 split 后返回即可。

实现方式

Before
1
2
3
4
5
6
7
8
9
// user is one data collection
// cate is the other data collection
JavaRDD<String> res = user.cartesian(cate).map(new Function<Tuple2<Tuple2<String, String>, Tuple2<String, String>>, String>() {
@Override
public String call(Tuple2<Tuple2<String, String>, Tuple2<String, String>> tuple2Tuple2Tuple2) throws Exception {
// about map code
return str;
}
});
After
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// user is one data collection
// cate is the other data collection and is the small collection
// sc is JavaSparkContext
List<Tuple2<String, String>> collect = cate.collect();
Broadcast<List<Tuple2<String, String>>> broadcast = sc.broadcast(collect);
JavaRDD<String> combine = user.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
@Override
public Iterator<String> call(Tuple2<String, String> stringStringTuple2) throws Exception {
for(Tuple2<String, String> value : broadcast.value()) {
... // join one user data with all the cate data to one string
}
return Arrays.asList(string.split()).iterator();
}
});

结论

Emmm…不止可以跑出来,而且快了很多…