最近在集群上跑spark时发现有些reduceByKey操作结果不符合预期,大致伪代码如下(公司统一用java,就没写成scala,用了scala的简写节省字数)。就是类似WordCount的简单计算,DimType是一个枚举类
JavaPairRDD<DimType, Long> rawRdd=...; JavaPairRDD<DimType, Long> reducedRdd = entryPairRDD .reduceByKey(_+_); List<Tuple2<DimType, Long>> results = reducedRdd.collect(); for (Tuple2<DimType, Long> tuple2 : results) { logger.info("Result: " + tuple2); ...; }
脚本在单节点运行正常,但是设置多个Executor(如spark.executor.instances=2)结果就发生重复项,输出大致如下这样:
Result: (A,1) Result: (A,2) Result: (B,3) Result: (C,3) Result: (B,2) Result: (C,4)
所有枚举项都出现了两次(正好等于executor的实例数),就好像各个Executor之间没有进行reduce一样
出现这个情况的原因比较tricky,因为spark的Shuffle过程会根据key的hashCode来判定相等,而恰恰Enum类的hashCode比较特殊,系统写死了就等于内存地址
public final int hashCode() { return super.hashCode(); }
这就导致在同不同进程里的枚举项被当成了不同的key,于是没有聚合起来
本来重写hashCode就可以解决问题,但坑爹的是Enum.hashCode()还被定义成final方法,无法被子类覆盖。所以只能自己在外面再封装一层对象,然后重新hashCode(),例如用Enum.name().hashCode()。或者干脆就不要用枚举类来做RDD的Key,以免发生类似问题
另外如果用其他自定义类做key的时候,一定要记得重写hashCode和equals,否则跨进程的时候也会发生类似问题
相关推荐
Scala中文分词+SparkML逻辑回归 实现 中文文本分类
Spark跨集群bulk load(6-2)
基于Spark mlib 的垃圾邮件分类 实现文档 使用Scala实现
第1章 Spark简介 1.1 Spark是什么 1.2 Spark生态系统BDAS 1.3 Spark架构 1.4 Spark分布式架构与单机多核架构的异同 1.5 Spark的企业级应用 1.5.1 Spark在Amazon中的应用 1.5.2 Spark在Yahoo!的应用 1.5.3 Spark在...
资源是java连接spark的源码,里面有支持连接hive,spark的方法,内部有两个方法,一个是getMaps,获取一个List对象,用于直接使用,一个是getJson,将获取到的数据转换成json,方便好用,不想下载的可以去我的博客去...
涵盖Spark的架构设计、Spark的集群搭建、Spark内核的解析、Spark SQL、MLLib、GraphX、Spark Streaming、Tachyon、SparkR、Spark多语言编程、Spark常见问题及调优等,并且结合Spark源码细致的解析了Spark内核和四大...
├─Spark-day01 │ 01-[了解]-Spark发展历史和特点介绍.mp4 │ 03-[掌握]-Spark环境搭建-Standalone集群模式.mp4 │ 06-[理解]-Spark环境搭建-On-Yarn-两种模式.mp4 │ 07-[掌握]-Spark环境搭建-On-Yarn-两种...
Apache Spark 2.0 for Beginners English | ISBN: 1785885006 | 2016 | Key Features This book offers an easy introduction to the Spark framework published on the latest version of Apache Spark 2 Perform ...
Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql),总结的很全面。 Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql)。 Spark零基础思维导图(内含spark-core ,spark-streaming,...
主要给大家介绍了关于Spark SQL操作JSON字段的小技巧,文中通过示例代码介绍的非常详细,对大家学习或者使用spark sql具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧。
在使用spark读取kafka数据时,当spark升级到2.0之后,出现如上问题:之前遇到了,当时在工程里面添加了org.apache.spark.Logging类,能够运行。
星火网用于Spark的分布式神经网络。... 通过运行在EC2上启动一个由5个工人组成的Spark集群 SparkNet/ec2/spark-ec2 --key-pair=key \ --identity-file=key.pem \ --region=eu-west-1 \ --zone=eu-west-1c \ --instan
【大数据学习资料】Spark单value,key-value类型21个算子(图解与源码)
Spark-2.3.1源码解读。 Spark Core源码阅读 Spark Context 阅读要点 Spark的缓存,变量,shuffle数据等清理及机制 Spark-submit关于参数及部署模式的部分解析 GroupByKey VS ReduceByKey OrderedRDDFunctions...
Spark 入门实战系列,适合初学者,文档包括十部分内容,质量很好,为了感谢文档作者,也为了帮助更多的人入门,传播作者的心血,特此友情转贴: 1.Spark及其生态圈简介.pdf 2.Spark编译与部署(上)--基础环境搭建....
SparkTI (Spark on TiDB)是TiDB基于Apache Spark的独立于原生系统的计算引擎。它将Spark和TiDB深度集成,在原有MySQL Workload之外借助Spark支持了更多样的用户场景和API。这个项目在SparkSQL和Catalyst引擎之外实现...
(1)输入start-all.sh启动hadoop相应进程和相关的端口号 (2)启动Spark分布式集群并查看信息 (3)网页端查看集群信息 (4)启动spark-shell控制台 1:local模式启动 2:登录master服务器,在集群模式下启动 (5)...
spark Linux 版本安装包spark Linux 版本安装包spark Linux 版本安装包spark Linux 版本安装包spark Linux 版本安装包spark Linux 版本安装包spark Linux 版本安装包spark Linux 版本安装包spark Linux 版本安装包...
Spark使得程序员更容易地编写分布式应用,并且能够根据自己的喜好使用Scala、Java或者Python作为开发语言。本书系统讲解了Spark的应用方法,包括如下内容:第1章介绍如何在多种机器上安装Spark,以及如何配置一个...
Spark中的Transformation操作之Key-Value数据类型的算子: Spark中的Action操作: Transformation->map算子: Transformation->flatMap算子: FaltMap算子与Map算子的区别 Action->reduce算子: Action->collect算子: ...