一、Spark结构

Alt text
  • 使用java、scala、python任意一种语言编写的Spark应用叫Driver
  • Driver程序一般负责初始SparkContext,然后通过SparkContext与整个集群通信,进行分布式计算,比如通过SparkContext创建RDD。鉴于Driver行驶的地位,其角色上有可叫central coordinator
  • SparkContext与集群通信的方式  第一步先通过Cluster Manager申请计算资源Executor 第二步,SparkContext与Executor直接通信,将分布式计算程序发送到每个Executor 第三步,SparkContext发送当前要执行的计算Task给Executor执行  
  • Worker Node是Spark集群中的某个具体节点,也叫slave
  • Executor是在Worker Node上开的一个应用执行器,他会在worknode上起一个JVM, 他可以执行多个Task, Executor是应用隔离的。也即一个Executor只能属于某一个Spark应用,这样Spark集群才能同时服务多个Spark应用,互不干扰。
  • Executor有点像Java中的工作线程一样,可以执行SparkContext发来的多个任务。不同的是Executor是一个独立的JVM进程
  • Cluster Manager是有多种类型,可以是Spark自带的Standalone 集群,也可以是YARN或Mesos集群

二、如何部署Spark程序

以scala为例,我们通过IDE编写Spark应用后,将其打包成jar包,然后使用spark-submit程序进行部署

 ./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]
  • --class: 应用的主方法入口
  • --master: cluster manager集群地址,可以是local,也可以是yarn或mesos,或者spark自带的standalone 地址
  • --deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client)
  • --conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown).
  • application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes.
  • application-arguments: Arguments passed to the main method of your main class, if any

2.1 上述master参数可配置的值如下

Alt text

2.2 Spark配置优先级

优先级从高到低依次是:

  • 直接在代码中通过SparkConf控制,比如指定cluster manager的master参数,可以在代码中配置 val conf = new SparkConf().setAppName("WordCount").setMaster("local");
  • 在命令中指定,比如: ./bin/spark-submit  --class org.apache.spark.examples.SparkPi  --master yarn  --deploy-mode cluster \  # can be client for client mode --executor-memory 20G  --num-executors 50  /path/to/examples.jar  1000
  • 在spark的安装目录下,通过spark-defaults.conf配置。

三、RDD

RDD是一个统一分布式数据抽象数据集。其下对应实际的数据存储介质,可能是文件,也可以是hadoop。通过RDD可以进行tranformation和action操作,从而实现分布式计算。

3.1 关键数据结构

一个RDD具有以下固定的数据结构

  • 需要应用的计算操作,也即transformation
  • 当前RDD对应的分区列表。因为数据是分区存储的
  • 当前RDD依赖的父数据集。每个RDD都维护一个其依赖关系,这就构成了一个亲缘图谱叫做DAG(Directed Acyclic Graph),中文称作有向无环图。

总结来说,一个RDD的关键信息无非是,定义了数据来源,数据分布存储的情况,以及准备执行的计算逻辑。通过这些新,我们可以构建一个图,图的两个vertex分别是RDD,edge为computation

  private[spark] def conf = sc.conf
  // =======================================================================
  // Methods that should be implemented by subclasses of RDD
  // =======================================================================

  /**
   * :: DeveloperApi ::
   * Implemented by subclasses to compute a given partition.
   */
  @DeveloperApi
  def compute(split: Partition, context: TaskContext): Iterator[T]//当前RDD需要执行的计算

  /**
   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   *
   * The partitions in this array must satisfy the following property:
   *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
   */
  protected def getPartitions: Array[Partition]//当前RDD对应的分区

  /**
   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   */
  protected def getDependencies: Seq[Dependency[_]] = deps//当前RDD依赖的父亲数据集

  /**
   * Optionally overridden by subclasses to specify placement preferences.
   */
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

  /** Optionally overridden by subclasses to specify how they are partitioned. */
  @transient val partitioner: Option[Partitioner] = None

  // =======================================================================
  // Methods and fields available on all RDDs
  // =======================================================================

  /** The SparkContext that created this RDD. */
  def sparkContext: SparkContext = sc

  /** A unique ID for this RDD (within its SparkContext). */
  val id: Int = sc.newRddId()

  /** A friendly name for this RDD */
  @transient var name: String = null //当前RDD的名称

3.2 RDD 特点

  • 分布式
  • 不可变性,一个RDD生成后,就不可变,所有Transformation操作,都是在原RDD基础上生成新的RDD。
  • 自动容错特性,Spark RDD记录了数据谱系信息(Data lineage),也即check point。这样在某步失败后,可以直接重试那一步,而不用所有计算过程重来。谱系信息记录了,输入的数据,以及处理函数。由于RDD的不可变特性,以及处理函数的幂等性,使得整个重试不会有side effect。依然能保持计算的一致性
  • 没有性能优化 DataFrame会根据用户的sql,自动做性能优化。而RDD要求用户自己组织transformation atcion代码,可能用户组织的不合理,会导致数据频繁在集群间移动
  • 没有结构化信息 DataFrame有字段的名称,类型等信息,但RDD没有
  • Lazy Comuting.只有action时,前面所有的transformation动作才会执行。这节约了空间和时间。试想,如果每个transformation都单独执行一次,那每一次的计算调度都有时间成本,以及中间结果的存储成本

四、Spark的计算流程

  • driver创建RDD
  • RDD通过SparkContext的runJob方法,提交一次数据计算
  • SparkContext最终又交由DAGScheduler的runJob进行计算job执行
  • DAGScheduler使用handleJobSubmitted方法处理job,第一步是根据RDD中的DAG构建Stage列表。不涉及数据移动的transformation会被放到一个stage里面,比如filter和map操作,他们可以并行的在各分区中执行。第二步通过submitStage提交Stage到集群
  • DAGScheduler submitStage再调用submitMissingTasks方法。submitMissingTasks会将stage转化成task
  • task最后通过TaskScheduler提交到spark集群的worknode,进行实际执行

五、RDD Transformation

将RDD进行一系列变换,生成新的RDD的过程,叫做Transformation。所有那些可以就地计算,而不需要数据迁移的transformation叫做Narrow Transformation。

5.1 transformation大概源码

以map操作为例

  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    //将传进来的函数f进行clean,这里先不深究,只需要知道clean后的函数,跟原函数功能相同
    
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
    //这里返回MapPartitionsRDD对象,其构造参数为当前RDD和一个将f应用于迭代器的函数定义
  }
  • 可以看到map方法并没有执行任何函数,而只是将所有计算过程和数据包装成MapPartitionsRDD后返回。这也就是transformation操作,lazy Computing的特点所在。
  • 所有tranformation返回的都是RDD,比如filter。其余transfomation 函数源码大致同map类似,不再赘述

5.2 flatMap

map操作是将迭代RDD中的每个元素,然后将其做一定加工,返回的的依然是一个元素。而flapMap接受的函数参数的入参是RDD中的每个元素,但对该元素处理后,返回的是一个集合,而不是一个元素。flatMap源码如下:

  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  }

总结来说,map和flapMap的异同点如下:
- map接受的函数参数签名是:(f: T => U)而flatMap接受的函数参数签名为:(f: T => TraversableOnce[U]),可以看到返回的是集合

  • map和flatMap的返回值都是RDD[T]。也即是说,flatMap拥有将多个集合数据合并,抹平的功效,从该函数的命名也可看出这一点,flat是平的意思

5.2 Narrow Transformations

Narrow Transformation操作有

Alt text

5.3 Wide Transformations

有些计算,需要依赖其他节点数据,这种计算会导致数据移动,成为Wide Transformations。比如,基于某个key分类的操作GroupByKey,这个Key可能散落在不同的work node上,为了进行GroupByKey计算,需要计算节点间进行数据移动,比如将某个Key对应的数据,统一移动到一个节点上。Wide Transformation操作有如下:

Alt text

六、RDD Action

所有Tranformation操作,都不会真正执行,直到Action操作被调用,Action操作返回是具体值,而不是RDD。这种特性成为Lazy Computing.
Action操作触发后,会将执行结果发给Driver 或者写如到外部存储。以下操作属于Action操作:
First(), take(), reduce(), collect(),  count()

6.1 关键action源码

所有action操作,最终都会调用SparkContext的runJob方法。runJob有需多重载方法,以其中一个为例

def runJob[T, U: ClassTag](
      rdd: RDD[T],//需要处理的RDD数据
      processPartition: Iterator[T] => U,//需要在每个数据分区上进行的操作
      resultHandler: (Int, U) => Unit)//如何将上述每个分区处理后的结果进行处理

可以看到runJob中体现了所有分布式计算理论架构,即MapReduce。其中processPartition定义每个分区要需要做的map操作,这一步将减少数据量,将map操作的结果做为输入,传进reduce操作,进行汇总处理。

6.2 aggregate

  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())//1
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)//2
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)//3
    sc.runJob(this, aggregatePartition, mergeResult)//4
    jobResult//5
  }
  1. 定义一个结果汇总变量,它将存储aggregate方法最终的返回结果,初始值为zeroValue
  2. 在每个RDD数据分区上,使用迭代器,应用aggregate方法,初始值为都为zeroValue
  3. 对每个分区的结果,使用combOp方法进行汇总计算。输入index为分区的编号,taskResult为上一步每个分区计算后的结果,同汇总变量jobResult再来进行combOp计算。从第一步可知,jobResult的初始值为zeroValue
  4. 将上述两个函数作为入参,传递给sc.runJob方法,在spark集群进行执行
  5. 返回结果

举例:

    val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)
    val result = inputrdd.aggregate(3)(
      (acc, value) => {
        println(acc+":"+value)
        (acc + value._2)
      },
      (acc1, acc2) => (acc1 * acc2)
    )
    println(result)//输出4032

解释:

  • 上述RDD,被切分成两个分区。第一个分区数据是("maths", 21) ,另一个是:("english", 22),("science", 31)
  • (acc + value._2)是每个分区要执行的操作,迭代器带入zeroValue=3后,两个分片的计算中间值如下 3+21=24//分区1 3+22+31=56//分区2
  • 最后将每个分区结果带入(acc1 * acc2)函数,从aggregate源码得知,结果计算也要运用zeroValue,在这里也就是3.于是最终步执行的计算如下: 32456=4032

6.3 fold

fold函数同aggregate类似,同样是调用SparkContext的runJob函数,只不过fold只接受一个值参数,和一个函数参数,其内部在调用runJob时,分区计算和结果计算都使用同样的函数。源码如下:

  def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
    val cleanOp = sc.clean(op)
    val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
    val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
    sc.runJob(this, foldPartition, mergeResult)
    jobResult
  }

举例:

val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)//1
val result = inputrdd.fold(("test",3))(
  (acc, ele) => {
    println(acc+":"+ele)
    ("result",acc._2 + ele._2)
  }
)
println(result)//输出:(result,83)

假设注释1中切分的2个分区为("maths", 21)和("english", 22),("science", 31),那么执行过程如下:

  1. 3+21=24
  2. 3+22+31=56
  3. 3+24+56=83

6.4 reduce

reduce同样调用了SparkContext的runJob函数,但reduce接收的参数在fold上进一步简化,少了zeroValue参数,只接收一个函数参数即可。同样该参数,在调用runJob时,即作为分区收敛的函数,记作为分区汇总计算的函数

  def reduce(f: (T, T) => T): T = withScope {
    val cleanF = sc.clean(f)
    val reducePartition: Iterator[T] => Option[T] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(cleanF))
      } else {
        None
      }
    }
    var jobResult: Option[T] = None
    val mergeResult = (index: Int, taskResult: Option[T]) => {
      if (taskResult.isDefined) {
        jobResult = jobResult match {
          case Some(value) => Some(f(value, taskResult.get))
          case None => taskResult
        }
      }
    }
    sc.runJob(this, reducePartition, mergeResult)
    // Get the final result out of our Option, or throw an exception if the RDD was empty
    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
  }

举例:

    val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)
    val result = inputrdd.reduce(
      (acc, ele) => {
        println(acc+":"+ele)
        ("result",acc._2 + ele._2)
      }
    )
    println(result)//结果为:(result,74)

6.5 collect&top

collect和top方法都会将数据收集到driver本地,前者是收集全部,后者是收集指定条数。所以最好知道收集的数据集较小时使用。否则会有很大的性能问题,比如大数量的传输,以及driver本地的内存压力

6.6 reduce和reduceByKey

前者是action操作,后者是transformation操作

七、RDD cache优化

RDD的数据,来至于外部存储介质,比如磁盘。而每一次用该RDD,都要去磁盘加载,这有时间和性能上的损耗。可以使用rdd的cahce方法,将该RDD缓存到内存,这样后续重复使用该RDD时,直接去内存拿。
cache的几个级别

  • MEMORY_ONLY 只缓存到内存,内存装不下的部分,下次用到时再重新计算
  • MEMORY_AND_DISK 缓存到内存,内存装不下的,缓存到磁盘。这样,下次需要时,不在内存部分的数据直接从磁盘获取就行,不用重新计算
  • MEMORY_ONLY_SER 只缓存到内存,但为了节约空间,将缓存对象序列化后存储
  • MEMORY_AND_DISK_SER 缓存到内存,装不下的数据缓存到磁盘,都是用序列化方式存储
  • DISK_ONLY 只缓存到磁盘

7.1 stage

按数据是否在分区间迁移,来划分stage。一个stage有多个task,他们会并发的在不同的分区上执行相同的计算代码。比如紧邻的map和filter就会被划在同一个stage,因为他们可以并发在各分区上执行,而不需要数据移动。而reduceByKey则会单独成为一个stage,因为其涉及到数据移动

八、RDD  lineage & DAG

RDD
从一个RDD转化成另一个RDD时,每一步都会记录上一个RDD关系。于是这形成一个血统谱系。具体

    val wordCount1 = sc.textFile("InputText").flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)
    println(wordCount1.toDebugString)

最终输出:

(1) ShuffledRDD[4] at reduceByKey at SparkTest.scala:124 []
 +-(1) MapPartitionsRDD[3] at map at SparkTest.scala:124 []
    |  MapPartitionsRDD[2] at flatMap at SparkTest.scala:124 []
    |  InputText MapPartitionsRDD[1] at textFile at SparkTest.scala:124 []
    |  InputText HadoopRDD[0] at textFile at SparkTest.scala:124 []

可以看到结果以倒序的方式输出,有点像java异常时,打出的依赖栈。从最近的依赖点,一直回溯

九、DataFrame

在RDD上进一步封装的数据结构。这种数据结构可以使用SparkSql去操作处理数据,这降低了对分布式数据集的使用难度。因为你只要会sql,就可以进行一些处理

十、GraphX

十一、 如何调优

一个Spark应用最会对应多个JVM进程。分布式driver,以及该应用在每个worknode上起的JVM进程,由于driver担任的协调者角色,实际执行是worknode上的EXECUTOR,所以对于JVM的调优,主要指对Executor的调优。这些JVM进程彼此会通信,比如数据shuffle。所以优化Spark应用的思路主要从以下个方面入手:

  • 做个一个JVM应用,需要关注JVM的垃圾回收情况,各年龄带的内存分配。这个需要基于具体应用具体分析
  • 由于多个JVM进程之间设计跨网络,跨机器的数据传输,那么需要考虑如何减小传输数据量。比如将数据序列化
  • 对于Spark计算框架本身的特点,还有对数据量较大的输入,采用提高并发度,来切分输入大小。频繁使用的RDD,进行缓存,减小集群重复计算加载的开销。将各分区都要用到的公共大变量,提前brodcast到各集群等

11.1 序列化

通过sparkConf conf.set(“spark.serializer”, “org.apache.spark.serializer.KyroSerializer”)来配置,指定数据对象的序列化方式

十二、参考资料

https://data-flair.training/blogs/spark-tutorial/
https://spark.apache.org/docs/1.6.1/