RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
SparkStreaming性能调优大全!-创新互联

SparkStreaming性能调优大全!

昌黎网站建设公司创新互联,昌黎网站设计制作,有大型网站制作公司丰富经验。已为昌黎上1000+提供企业网站建设服务。企业网站搭建\成都外贸网站制作要多少钱,请找那个售后服务好的昌黎做网站的公司定做!

一、日志已满:

 spark.executor.logs.rolling.maxSize

下面三个日志rolling参数记得设置:

spark.executor.logs.rolling.strategy size

spark.executor.logs.rolling.maxSize 134217728 #default byte

spark.executor.logs.rolling.maxRetainedFiles

下面是spark1.6的源码:

[spark] RollingFileAppender {
  = = = = = = (* ).toString
  = =

二、Spark Streamingz对Kafka的Offset进行管理

zookeeper.session.timeout.ms

一般跳大3~5倍。

http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/

http://www.tuicool.com/articles/vaUzquJ

SparkStreaming性能调优大全!

[spark] SparkCuratorUtil Logging {

  = = = = (
      conf: SparkConfzkUrlConf: = ): CuratorFramework = {
    ZK_URL = conf.get(zkUrlConf)
    zk = CuratorFrameworkFactory.newClient(ZK_URLExponentialBackoffRetry())
    zk.start()
    zk
  }

三、 spark.task.maxFailures

SparkStreaming性能调优大全!默认4,调整10左右

TaskSetManagerSuite SparkFunSuite LocalSparkContext Logging {
  TaskLocality.{}

  = SparkConf

  = .getTimeAsMs()
  = () {
    .beforeEach()
    FakeRackUtil.cleanUp()
  }

  test() {
    sc = SparkContext()
    sched = FakeTaskScheduler(sc())
    taskSet = FakeTask.createTaskSet()
    clock = ManualClock
    manager = TaskSetManager(schedtaskSetclock)

四、spark.streaming.kafka.maxRetries

默认1,调成3或者5

五、Spark Streaming连接Kafka用Direct方式。

六、怎么调优?入口在哪?

答案就是Spark配置参数的地方:

1. $SPARK_HOME/conf/spark-env.sh 脚本上配置。 配置格式如下:

export SPARK_DAEMON_MEMORY=1024m

2. 编程的方式(程序中在创建SparkContext之前,使用System.setProperty(“xx”,“xxx”)语句设置相应系统属性值),

val conf = new SparkConf()

       .setMaster("local")

       .setAppName("CountingSheep")

       .set("spark.executor.memory", "1g")

val sc = new SparkContext(conf)

3、即在spark-shell下和spark-submit下配置

如:Scala> System.setProperty("spark.akka.frameSize","10240m")

System.setProperty("spark.rpc.askTimeout","800")

./bin/spark-submit --name "My app"

              --master local[4]

              --conf spark.shuffle.spill=false

              --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails

                    -XX:+PrintGCTimeStamps"

              myApp.jar

spark-submit也会从默认配置文件conf/spark-defaults.conf里选取配置项,格式如下:

spark.master       spark://iteblog.com:7077

spark.executor.memory  512m

spark.eventLog.enabled  true

spark.serializer     org.apache.spark.serializer.KryoSerializer

(一)环境变量spark-env.sh配置项

SCALA_HOME             #指向你的scala安装路径

MESOS_NATIVE_LIBRARY  #如果你要在Mesos上运行集群的话

SPARK_WORKER_MEMORY #作业可使用的内存容量,默认格式1000M或者 2G (默认: 所有RAM去掉给操作系统用的1 GB);每个作业独立的内存空间由SPARK_MEM决定。

SPARK_JAVA_OPTS  #添加JVM选项。你可以通过-D来获取任何系统属性

eg: SPARK_JAVA_OPTS+="-Dspark.kryoserializer.buffer.mb=1024"

SPARK_MEM    #设置每个节点所能使用的内存总量。他们应该和JVM‘s -Xmx选项的格式保持一致(e.g.300m或1g)。注意:这个选项将很快被弃用支持系统属性spark.executor.memory,所以我们推荐将它使用在新代码中。

SPARK_DAEMON_MEMORY  #分配给Spark master和worker守护进程的内存空间(默认512M)

SPARK_DAEMON_JAVA_OPTS #Spark master和worker守护进程的JVM选项(默认:none)

(二)System Properties

  • spark.akka.frameSize: 控制Spark中通信消息的大容量 (如 task 的输出结果),默认为10M。当处理大数据时,task 的输出可能会大于这个值,需要根据实际数据设置一个更高的值。如果是这个值不够大而产生的错误,可以从 worker的日志 中进行排查。通常 worker 上的任务失败后,master 的运行日志上出现”Lost TID: “的提示,可通过查看失败的 worker 的日志文件($SPARK_HOME/worker/下面的log文件) 中记录的任务的 Serialized size of result 是否超过10M来确定。

  • spark.default.parallelism: 控制Spark中的分布式shuffle过程默认使用的task数量,默认为8个。如果不做调整,数据量大时,就容易运行时间很长,甚至是出Exception,因为8个task无法handle那么多的数据。 注意这个值也不是说设置得越大越好。

  • spark.local.dir:Spark 运行时的临时目录,例如 map 的输出文件,保存在磁盘的 RDD 等都保存在这里。默认是 /tmp 这个目录,而一开始我们搭建的小集群上 /tmp 这个目录的空间只有2G,大数据量跑起来就出 Exception (”No space left on device”)了。

如何如何查看已配置好并生效的参数?

通过webui来进行查看,http://master:4040/

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


当前题目:SparkStreaming性能调优大全!-创新互联
网址分享:http://lswzjz.com/article/dsiojc.html