这篇文章主要介绍“coalesce与repartition怎么使用”,在日常操作中,相信很多人在coalesce与repartition怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”coalesce与repartition怎么使用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
为定边等地区用户提供了全套网页设计制作服务,及定边网站建设行业解决方案。主营业务为成都网站建设、成都网站制作、定边网站设计,以传统方式定制建设网站,并提供域名空间备案等一条龙服务,秉承以专业、用心的态度为用户提供真诚的服务。我们深信只要达到每一位用户的要求,就会得到认可,从而选择与我们长期合作。这样,我们也可以走得更远!
coalesce
def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer:Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]
一、功能介绍
coalesce算子最基本的功能就是返回一个numPartitions个partition的RDD。
二、使用及注意事项
这个算子的结果默认是窄依赖,举个例子
coalesce(100)
如果你想把1000个partition减少到100个partition,此时不会发生shuffle,而是每一个你设定的新partition都会替代原来的10个partition。如果初始的最大partition是100个,而你想用coalesce(1000)把partition数增至1000,这是不行的。
现在有一个需求,需要将某一个文件做ETL,最后想输出成一个文件,你会怎么办呢?
这样么?
val logs=sc.textFile(args(0),6)//你想初始化6个分区,并行执行,之后再合并成1个文件 logs.map(x=>{ if(x.split("\t").length==72){ val clean=parse(x) //此处是进行了ETL clean } }).coalesce(2).saveAsTextFile(args(1))
如果你同意的话,可以写个demo测试一下,你会发现,仅仅有一个task!在生产上这是绝对不行!因为上述ETL的spark job仅仅有一个stage,你虽然初始化RDD是设定的6个partition,但是在action之前你使用了.coalesce(1),此时会优先使用coalesce里面的partition数量初始化RDD,所以仅仅有一个task。生产中文件很大的话,你就只能用两个节点处理,这样无法发挥集群的优势了。解决:要在coalesce中加shuffle=tule
val logs=sc.textFile(args(0),6) logs.map(x=>{ if(x.split("\t").length==72){ val clean=parse(x) //此处是进行了ETL clean } }).coalesce(2,shuffle = true).saveAsTextFile(args(1))
这样,我们就会有两个stage,stage1是6个并行高速ETL处理,stage2是通过shuffle合并成2个文件
如下图
我们知道了,可以手动设定shuffle的发生,那么问题来了,刚刚我们不能将初始化的分区数变大,如果加上shuffle可不可以呢?答案是可以的~
如果出事RDD为100个分区,你觉得并行度不够,你可以coalesce(1000,shuffle = true),将分区数增加到1000(默认hash partitioner进行重新),当然你也可以使用自定义分区器,但是一定要序列化。
三、总结
coalesce算子默认只能减少分区数量,但是可以通过开启shuffle增加分区数量
coalesce的作用常常是减少分区数,已达到输出时合并小文件的效果。
在一个stage中,coalesce中设定的分区数是优先级最高的,如果想增加并行度,并合并文件,那么请开启coalesce中的shuffle,这样就会变成两个stage。达到并行且合并的效果。
repartition
/** * Return a new RDD that has exactly numPartitions partitions. * * Can increase or decrease the level of parallelism in this RDD. Internally, this uses * a shuffle to redistribute data. * * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, * which can avoid performing a shuffle. * * TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207. */ def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }
这个算子前后是一个宽依赖,字面就是重新分区的意思,与coalesce不同,repartition一定会将分区变成numPartitions个的!通过看源码可知,它底层时调用的coalesce算子,并且使用该算子一定会shuffle。
到此,关于“coalesce与repartition怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!
本文题目:coalesce与repartition怎么使用
标题链接:http://lswzjz.com/article/pdjicc.html