发布时间:2025-12-09 16:31:12 浏览次数:4
本文已收录至Github,推荐阅读 👉 Java随想录
学习一个东西之前总要知道这个东西是什么。
Spark 是一个开源的大数据处理引擎,它提供了一整套开发 API,包括流计算和机器学习。它支持批处理和流处理。
Spark 的一个显著特点是它能够在内存中进行迭代计算,从而加快数据处理速度。尽管 Spark 是用 Scala 开发的,但它也为 Java、Scala、Python 和 R 等高级编程语言提供了开发接口。
Spark提供了6大组件:
Spark Core 是 Spark 的基础,它提供了内存计算的能力,是分布式处理大数据集的基础。它将分布式数据抽象为弹性分布式数据集(RDD),并为运行在其上的上层组件提供 API。所有 Spark 的上层组件都建立在 Spark Core 的基础之上。
Spark SQL 是一个用于处理结构化数据的 Spark 组件。它允许使用 SQL 语句查询数据。Spark 支持多种数据源,包括 Hive 表、Parquet 和 JSON 等。
Spark Streaming 是一个用于处理动态数据流的 Spark 组件。它能够开发出强大的交互和数据查询程序。在处理动态数据流时,流数据会被分割成微小的批处理,这些微小批处理将会在 Spark Core 上按时间顺序快速执行。
Spark MLlib 是 Spark 的机器学习库。它提供了常用的机器学习算法和实用程序,包括分类、回归、聚类、协同过滤、降维等。MLlib 还提供了一些底层优化原语和高层流水线 API,可以帮助开发人员更快地创建和调试机器学习流水线。
Spark GraphX 是 Spark 的图形计算库。它提供了一种分布式图形处理框架,可以帮助开发人员更快地构建和分析大型图形。
Spark 有许多优势,其中一些主要优势包括:
速度:Spark 基于内存计算,能够比基于磁盘的计算快很多。对于迭代式算法和交互式数据挖掘任务,这种速度优势尤为明显。
易用性:Spark 支持多种语言,包括 Java、Scala、Python 和 R。它提供了丰富的内置 API,可以帮助开发人员更快地构建和运行应用程序。
通用性:Spark 提供了多种组件,可以支持不同类型的计算任务,包括批处理、交互式查询、流处理、机器学习和图形处理等。
兼容性:Spark 可以与多种数据源集成,包括 Hadoop 分布式文件系统(HDFS)、Apache Cassandra、Apache HBase 和 Amazon S3 等。
容错性:Spark 提供了弹性分布式数据集(RDD)抽象,可以帮助开发人员更快地构建容错应用程序。
下面是一个简单的Word Count的Spark程序:
import org.apache.spark.{SparkConf, SparkContext}object SparkWordCount {def main (args:Array [String]): Unit = {//setMaster("local[9]") 表示在本地运行 Spark 程序,使用 9 个线程。local[*] 表示使用所有可用的处理器核心。//这种模式通常用于本地测试和开发。val conf = new SparkConf ().setAppName ("Word Count").setMaster("local[9]");val sc = new SparkContext (conf);sc.setLogLevel("ERROR")val data = List("Hello World", "Hello Spark")val textFile = sc.parallelize(data)val wordCounts = textFile.flatMap (line => line.split (" ")).map (word => (word, 1)).reduceByKey ( (a, b) => a + b)wordCounts.collect().foreach(println)}}程序首先创建了一个 SparkConf 对象,用来设置应用程序名称和运行模式。然后,它创建了一个 SparkContext 对象,用来连接到 Spark 集群。
接下来,程序创建了一个包含两个字符串的列表,并使用 parallelize 方法将其转换为一个 RDD。然后,它使用 flatMap 方法将每一行文本拆分成单词,并使用 map 方法将每个单词映射为一个键值对(key-value pair),其中键是单词,值是 1。
最后,程序使用 reduceByKey 方法将具有相同键的键值对进行合并,并对它们的值进行求和。最终结果是一个包含每个单词及其出现次数的 RDD。程序使用 collect 方法将结果收集到驱动程序,并使用 foreach 方法打印出来。
Spark的理论较多,所以先了解一下基本概念,有助于后面展开学习Spark。
用户编写的Spark应用程序。
如下,"Word Count"就是该应用程序的名字。
import org.apache.spark.sql.SparkSessionobject WordCount {def main(args: Array[String]) {// 创建 SparkSession 对象,它是 Spark Application 的入口val spark = SparkSession.builder.appName("Word Count").getOrCreate()// 读取文本文件并创建 Datasetval textFile = spark.read.textFile("hdfs://...")// 使用 flatMap 转换将文本分割为单词,并使用 reduceByKey 转换计算每个单词的数量val counts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()// 将结果保存到文本文件中counts.write.text("hdfs://...")// 停止 SparkSessionspark.stop()}}Driver 是运行 Spark Application 的进程,它负责创建 SparkSession 和 SparkContext 对象,并将代码转换为转换和操作操作。它还负责创建逻辑和物理计划,并与集群管理器协调调度任务。
简而言之,Spark Application 是使用 Spark API 编写的程序,而 Spark Driver 是负责运行该程序并与集群管理器协调的进程。
可以将Driver 理解为运行 Spark Application main 方法的进程。
driver的内存大小可以进行设置:
# 设置 driver内存大小driver-memory 1024m在Spark中,Master是独立集群的控制者,而Worker是工作者。一个Spark独立集群需要启动一个Master和多个Worker。Worker就是物理节点,可以在上面启动Executor进程。
在每个Worker上为某应用启动的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上,每个任务都有各自独立的Executor。Executor是一个执行Task的容器。实际上它是一组计算资源(cpu核心、memory)的集合。
一个Worker节点可以有多个Executor。一个Executor可以运行多个Task。
executor创建成功后,在日志文件会显示如下信息:
INFO Executor: Starting executor ID [executorId] on host [executorHostname]
一个Job包含多个RDD及作用于相应RDD上的各种操作,每个Action的触发就会生成一个job。用户提交的Job会提交给DAGScheduler,Job会被分解成Stage,Stage会被细化成Task。
被发送到executor上的工作单元。每个Task负责计算一个分区的数据。
在 Spark 中,一个作业(job)会被划分为多个阶段(stage)。同一个 Stage 可以有多个 Task 并行执行(task 数=分区数)。
阶段之间的划分是根据数据的依赖关系来确定的。当一个 RDD 的分区依赖于另一个 RDD 的分区时,这两个 RDD 就属于同一个阶段。当一个 RDD 的分区依赖于多个 RDD 的分区时,这些 RDD 就属于不同的阶段。
上图中,stage表示一个可以顺滑完成的阶段,就是可以单机运行。曲线表示Shuffle。
如果stage能够复用前面的stage的话,那么会显示灰色。
Stage的划分,简单的说是以宽依赖来划分:
对于窄依赖,partition 的转换处理在 stage 中完成计算,不划分(将窄依赖尽量放在在同一个 stage 中,可以实现流水线计算)。
对于宽依赖,由于有 shuffle 的存在,只能在父 RDD 处理完成后,才能开始接下来的计算,也就是说需要要划分 stage。
Spark 会根据 shuffle/宽依赖使用回溯算法来对 DAG 进行 Stage 划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的 RDD 加入到当前的 stage/阶段中。
Spark会根据RDD之间的依赖关系将DAG图划分为不同的阶段,对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成,窄依赖就被spark划分到同一个stage中,而对于宽依赖,只能等父RDD shuffle处理完成后,下一个stage才能开始接下来的计算。
至于什么是窄依赖和宽依赖,下面马上就会提及。
父 RDD 的一个分区只会被子 RDD 的一个分区依赖。比如:map/filter和union,这种依赖称之为窄依赖。
窄依赖的多个分区可以并行计算,并且窄依赖的一个分区的数据如果丢失只需要重新计算对应的分区的数据就可以了。
指子RDD的分区依赖于父RDD的所有分区,这是因为shuffle类操作,称之为宽依赖。
对于宽依赖,必须等到上一阶段计算完成才能计算下一阶段。
在 Spark 中,shuffle 是指在不同阶段之间重新分配数据的过程。它通常发生在需要对数据进行聚合或分组操作的时候,例如 reduceByKey 或 groupByKey 等操作。
在 shuffle 过程中,Spark 会将数据按照键值进行分区,并将属于同一分区的数据发送到同一个计算节点上。这样,每个计算节点就可以独立地处理属于它自己分区的数据。
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。
RDD的Partition是指数据集的分区。它是数据集中元素的集合,这些元素被分区到集群的节点上,可以并行操作。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
一个函数会被作用在每一个分区。Spark 中 RDD 的计算是以分片为单位的,compute 函数会被作用到每个分区上。
RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
有向无环图,其实说白了就是RDD之间的依赖关系图。
Spark的执行流程大致如下:
Spark 支持多种运行模式,包括本地模式、独立模式、Mesos 模式、YARN 模式和 Kubernetes 模式。
RDD的概念在Spark中十分重要,上面只是简单的介绍了一下,下面详细的对RDD展开介绍。
RDD是“Resilient Distributed Dataset”的缩写,从全称就可以了解到RDD的一些典型特性:
RDD里面的数据集会被逻辑分成若干个分区,这些分区是分布在集群的不同节点的,基于这样的特性,RDD才能在集群不同节点并行计算。
Spark RDD运算数据是在内存中进行的,在内存足够的情况下,不会把中间结果存储在磁盘,所以计算速度非常高效。
所有的转换操作都是惰性的,也就是说不会立即执行任务,只是把对数据的转换操作记录下来而已。只有碰到action操作才会被真正的执行。
Spark RDD具备容错特性,在RDD失效或者数据丢失的时候,可以根据DAG从父RDD重新把数据集计算出来,以达到数据容错的效果。
RDD是进程安全的,因为RDD是不可修改的。它可以在任何时间点被创建和查询,使得缓存,共享,备份都非常简单。在计算过程中,是RDD的不可修改特性保证了数据的一致性。
可以调用cache或者persist函数,把RDD缓存在内存、磁盘,下次使用的时候不需要重新计算而是直接使用。
RDD支持两种操作:
转换操作以RDD做为输入参数,然后输出一个或者多个RDD。转换操作不会修改输入RDD。Map()、Filter()这些都属于转换操作。
转换操作是惰性求值操作,只有在碰到行动操作(Actions)的时候,转换操作才会真正实行。转换操作分两种:窄依赖和宽依赖(上文提到过)。
下面是一些常见的转换操作:
| map | 将函数应用于 RDD 中的每个元素,并返回一个新的 RDD |
| filter | 返回一个新的 RDD,其中包含满足给定谓词的元素 |
| flatMap | 将函数应用于 RDD 中的每个元素,并将返回的迭代器展平为一个新的 RDD |
| union | 返回一个新的 RDD,其中包含两个 RDD 的元素 |
| distinct | 返回一个新的 RDD,其中包含原始 RDD 中不同的元素 |
| groupByKey | 将键值对 RDD 中具有相同键的元素分组到一起,并返回一个新的 RDD |
| reduceByKey | 将键值对 RDD 中具有相同键的元素聚合到一起,并返回一个新的 RDD |
| sortByKey | 返回一个新的键值对 RDD,其中元素按照键排序 |
Action是数据执行部分,其通过执行count,reduce,collect等方法真正执行数据的计算部分。
| reduce | 通过函数聚合 RDD 中的所有元素 |
| collect | 将 RDD 中的所有元素返回到驱动程序 |
| count | 返回 RDD 中的元素个数 |
| first | 返回 RDD 中的第一个元素 |
| take | 返回 RDD 中的前 n 个元素 |
| takeOrdered | 返回 RDD 中的前 n 个元素,按照自然顺序或指定的顺序排序 |
| saveAsTextFile | 将 RDD 中的元素保存到文本文件中 |
| foreach | 将函数应用于 RDD 中的每个元素 |
创建RDD有3种不同方式:
由外部存储系统的数据集创建,包括本地的文件系统,还有所有 Hadoop 支持的数据集,比如 HDFS、Cassandra、HBase 等:
val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")通过已有的 RDD 经过算子转换生成新的 RDD:
val rdd2=rdd1.flatMap(_.split(" "))其实makeRDD 方法底层调用了 parallelize 方法:
RDD 缓存是在内存存储RDD计算结果的一种优化技术。把中间结果缓存起来以便在需要的时候重复使用,这样才能有效减轻计算压力,提升运算性能。
要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。
val rdd1 = sc.textFile("hdfs://node01:8020/words.txt")val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)rdd2.cache //缓存/持久化rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行持久化rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,因为rdd2已经持久化到内存中了需要注意的是,在触发action的时候,才会去执行持久化。
cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,就是调用persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中去除缓存,那么可以使用unpersist()方法。
rdd.persist(StorageLevel.MEMORY_ONLY)rdd.unpersist()RDD存储级别主要有以下几种。
| MEMORY_ONLY | 高 | 低 | 是 | 否 | 使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。 |
| MEMORY_ONLY_2 | 高 | 低 | 是 | 否 | 数据存2份 |
| MEMORY_ONLY_SER | 低 | 高 | 是 | 否 | 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化。这种方式更加节省内存 |
| MEMORY_ONLY_SER_2 | 低 | 高 | 是 | 否 | 数据序列化,数据存2份 |
| MEMORY_AND_DISK | 高 | 中等 | 部分 | 部分 | 如果数据在内存中放不下,则溢写到磁盘 |
| MEMORY_AND_DISK_2 | 高 | 中等 | 部分 | 部分 | 数据存2份 |
| MEMORY_AND_DISK_SER | 低 | 高 | 部分 | 部分 | 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化 |
| MEMORY_AND_DISK_SER_2 | 低 | 高 | 部分 | 部分 | 数据存2份 |
| DISK_ONLY | 低 | 高 | 否 | 是 | 使用未序列化的Java对象格式,将数据全部写入磁盘文件中。 |
| DISK_ONLY_2 | 低 | 高 | 否 | 是 | 数据存2份 |
| OFF_HEAP | 这个目前是试验型选项,类似MEMORY_ONLY_SER,但是数据是存储在堆外内存的。 |
对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉了,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。
血缘关系是指 RDD 之间的依赖关系。当你对一个 RDD 执行转换操作时,Spark 会生成一个新的 RDD,并记录这两个 RDD 之间的依赖关系。这种依赖关系就是血缘关系。
血缘关系可以帮助 Spark 在发生故障时恢复数据。当一个分区丢失时,Spark 可以根据血缘关系重新计算丢失的分区,而不需要从头开始重新计算整个 RDD。
血缘关系还可以帮助 Spark 优化计算过程。Spark 可以根据血缘关系合并多个连续的窄依赖转换,减少数据传输和通信开销。
我们可以执行toDebugString打印RDD的依赖关系。
下面是一个简单的例子:
val conf = new SparkConf().setAppName("Lineage Example").setMaster("local")val sc = new SparkContext(conf)val data = sc.parallelize(List(1, 2, 3, 4, 5))val mappedData = data.map(x => x + 1)val filteredData = mappedData.filter(x => x % 2 == 0)println(filteredData.toDebugString)在这个例子中,我们首先创建了一个包含 5 个元素的 RDD,并对它执行了两个转换操作:map 和 filter。然后,我们使用 toDebugString 方法打印了最终 RDD 的血缘关系。
运行这段代码后,你会看到类似下面的输出:
(2) MapPartitionsRDD[2] at filter at <console>:26 []| MapPartitionsRDD[1] at map at <console>:24 []| ParallelCollectionRDD[0] at parallelize at <console>:22 []这个输出表示最终的 RDD 是通过两个转换操作(map 和 filter)从原始的 ParallelCollectionRDD 转换而来的。
CheckPoint可以将RDD从其依赖关系中抽出来,保存到可靠的存储系统(例如HDFS,S3等), 即它可以将数据和元数据保存到检查指向目录中。 因此,在程序发生崩溃的时候,Spark可以恢复此数据,并从停止的任何地方开始。
CheckPoint分为两类:
开发人员可以使用RDD.checkpoint()方法来设置检查点。在使用检查点之前,必须使用SparkContext.setCheckpointDir(directory: String)方法设置检查点目录。
下面是一个简单的例子:
import org.apache.spark.{SparkConf, SparkContext}object CheckpointExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Checkpoint Example").setMaster("local")val sc = new SparkContext(conf)// 设置 checkpoint 目录sc.setCheckpointDir("/tmp/checkpoint")val data = sc.parallelize(List(1, 2, 3, 4, 5))val mappedData = data.map(x => x + 1)val filteredData = mappedData.filter(x => x % 2 == 0)// 对 RDD 进行 checkpointfilteredData.checkpoint()// 触发 checkpointfilteredData.count()}}RDD的检查点机制就好比Hadoop将中间计算值存储到磁盘,即使计算中出现了故障,我们也可以轻松地从中恢复。通过对 RDD 启动检查点机制可以实现容错和高可用。
| —master | master 的地址,提交任务到哪里执行,例如 spark://host:port, yarn, local。具体指可参考下面关于Master_URL的列表 |
| —deploy-mode | 在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client |
| —class | 应用程序的主类,仅针对 java 或 scala 应用 |
| —name | 应用程序的名称 |
| —jars | 用逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下 |
| —packages | 包含在driver 和executor 的 classpath 中的 jar 的 maven 坐标 |
| —exclude-packages | 为了避免冲突 而指定不包含的 package |
| —repositories | 远程 repository |
| —conf PROP=VALUE | 指定 spark 配置属性的值, 例如 -conf spark.executor.extraJavaOptions=”-XX:MaxPermSize=256m” |
| —properties-file | 加载的配置文件,默认为 conf/spark-defaults.conf |
| —driver-memory | Driver内存,默认 1G |
| —driver-java-options | 传给 driver 的额外的 Java 选项 |
| —driver-library-path | 传给 driver 的额外的库路径 |
| —driver-class-path | 传给 driver 的额外的类路径 |
| —driver-cores | Driver 的核数,默认是1。在 yarn 或者 standalone 下使用 |
| —executor-memory | 每个 executor 的内存,默认是1G |
| —total-executor-cores | 所有 executor 总共的核数。仅仅在 mesos 或者 standalone 下使用 |
| —num-executors | 启动的 executor 数量。默认为2。在 yarn 下使用 |
| —executor-core | 每个 executor 的核数。在yarn或者standalone下使用 |
| local | 使用1个worker线程在本地运行Spark应用程序 |
| local[K] | 使用K个worker线程在本地运行Spark应用程序 |
| local | 使用所有剩余worker线程在本地运行Spark应用程序 |
| spark://HOST:PORT | 连接到Spark Standalone集群,以便在该集群上运行Spark应用程序 |
| mesos://HOST:PORT | 连接到Mesos集群,以便在该集群上运行Spark应用程序 |
| yarn-client | 以client方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver在client运行。 |
| yarn-cluster | 以cluster方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver也在集群中运行。 |
一般情况下,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本。这些变量被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序。通常跨任务的读写变量是低效的,所以,Spark提供了两种共享变量:广播变量(broadcast variable)和累加器(accumulator)。
广播变量允许程序员缓存一个只读的变量在每台机器上面,而不是每个任务保存一份拷贝。说白了其实就是共享变量。
如果Executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。如果使用广播变量在每个Executor中只有一份Driver端的变量副本。
一个广播变量可以通过调用SparkContext.broadcast(v)方法从一个初始变量v中创建。广播变量是v的一个包装变量,它的值可以通过value方法访问,下面的代码说明了这个过程:
import org.apache.spark.{SparkConf, SparkContext}object BroadcastExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Broadcast Example").setMaster("local")val sc = new SparkContext(conf)val data = sc.parallelize(List(1, 2, 3, 4, 5))// 创建一个广播变量val factor = sc.broadcast(2)// 使用广播变量val result = data.map(x => x * factor.value)result.collect().foreach(println)}}广播变量创建以后,我们就能够在集群的任何函数中使用它来代替变量v,这样我们就不需要再次传递变量v到每个节点上。另外,为了保证所有的节点得到广播变量具有相同的值,对象v不能在广播之后被修改。
累加器是一种只能通过关联操作进行“加”操作的变量,因此它能够高效的应用于并行操作中。它们能够用来实现counters和sums。
一个累加器可以通过调用SparkContext.accumulator(v)方法从一个初始变量v中创建。运行在集群上的任务可以通过add方法或者使用+=操作来给它加值。然而,它们无法读取这个值。只有驱动程序可以使用value方法来读取累加器的值。
示例代码如下:
import org.apache.spark.{SparkConf, SparkContext}object AccumulatorExample {def main(args: Array[String]) {val conf = new SparkConf().setAppName("AccumulatorExample")val sc = new SparkContext(conf)val accum = sc.longAccumulator("My Accumulator")sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))println(accum.value) // 输出 10}}这个示例中,我们创建了一个名为 My Accumulator 的累加器,并使用 sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)) 来对其进行累加。最后,我们使用 println(accum.value) 来输出累加器的值,结果为 10。
我们可以利用子类AccumulatorParam创建自己的累加器类型。AccumulatorParam接口有两个方法:zero方法为你的数据类型提供一个“0 值”(zero value);addInPlace方法计算两个值的和。例如,假设我们有一个Vector类代表数学上的向量,我们能够如下定义累加器:
object VectorAccumulatorParam extends AccumulatorParam[Vector] {def zero(initialValue: Vector): Vector = {Vector.zeros(initialValue.size)}def addInPlace(v1: Vector, v2: Vector): Vector = {v1 += v2}}// Then, create an Accumulator of this type:val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)Spark为结构化数据处理引入了一个称为Spark SQL的编程模块。它提供了一个称为DataFrame的编程抽象,并且可以充当分布式SQL查询引擎。
无缝地将SQL查询与Spark程序混合。 Spark SQL允许将结构化数据作为Spark中的分布式数据集(RDD)进行查询,在Python,Scala和Java中集成了API。这种紧密的集成使得可以轻松地运行SQL查询以及复杂的分析算法。
在现有仓库上运行未修改的Hive查询。 Spark SQL重用了Hive前端和MetaStore,提供与现有Hive数据,查询和UDF的完全兼容性。只需将其与Hive一起安装即可。
通过JDBC或ODBC连接。 Spark SQL包括具有行业标准JDBC和ODBC连接的服务器模式。
对于交互式查询和长查询使用相同的引擎。 Spark SQL利用RDD模型来支持中查询容错,使其能够扩展到大型作业。不要担心为历史数据使用不同的引擎。
Spark SQL 支持多种数据类型,包括数字类型、字符串类型、二进制类型、布尔类型、日期时间类型和区间类型等。
数字类型包括:
字符串类型包括:
二进制类型包括:
布尔类型包括:
日期时间类型包括:
区间类型包括:
YearMonthIntervalType (startField, endField):表示由以下字段组成的连续子集组成的年月间隔:MONTH(月份),YEAR(年份)。
DayTimeIntervalType (startField, endField):表示由以下字段组成的连续子集组成的日时间间隔:SECOND(秒),MINUTE(分钟),HOUR(小时),DAY(天)。
复合类型包括:
DataFrame 是 Spark 中用于处理结构化数据的一种数据结构。它类似于关系数据库中的表,具有行和列。每一列都有一个名称和一个类型,每一行都是一条记录。
DataFrame 支持多种数据源,包括结构化数据文件、Hive 表、外部数据库和现有的 RDD。它提供了丰富的操作,包括筛选、聚合、分组、排序等。
DataFrame 的优点在于它提供了一种高级的抽象,使得用户可以使用类似于 SQL 的语言进行数据处理,而无需关心底层的实现细节。此外,Spark 会自动对 DataFrame 进行优化,以提高查询性能。
下面是一个使用DataFrame的代码例子:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()import spark.implicits._val data = Seq(("Alice", 25),("Bob", 30),("Charlie", 35))val df = data.toDF("name", "age")df.show()在这个示例中,我们首先创建了一个 SparkSession 对象,然后使用 toDF 方法将一个序列转换为 DataFrame。最后,我们使用 show 方法来显示 DataFrame 的内容。
在 Scala 中,可以通过以下几种方式创建 DataFrame:
在 Spark 中,可以使用两种方式对 DataFrame 进行查询:DSL(Domain-Specific Language)和 SQL。
DSL 是一种特定领域语言,它提供了一组用于操作 DataFrame 的方法。例如,下面是一个使用 DSL 进行查询的例子:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("DSL and SQL").getOrCreate()import spark.implicits._val df = Seq(("Alice", 25),("Bob", 30),("Charlie", 35)).toDF("name", "age")df.select("name", "age").filter($"age" > 25).show()SQL 是一种结构化查询语言,它用于管理关系数据库系统。在 Spark 中,可以使用 SQL 对 DataFrame 进行查询。例如,下面是一个使用 SQL 进行查询的例子:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("DSL and SQL").getOrCreate()import spark.implicits._val df = Seq(("Alice", 25),("Bob", 30),("Charlie", 35)).toDF("name", "age")df.createOrReplaceTempView("people")spark.sql("SELECT name, age FROM people WHERE age > 25").show()DSL 和 SQL 的区别在于语法和风格。DSL 使用方法调用链来构建查询,而 SQL 使用声明式语言来描述查询。选择哪种方式取决于个人喜好和使用场景。
Spark SQL 支持多种数据源,包括 Parquet、JSON、CSV、JDBC、Hive 等。
下面是示例代码:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("Data Sources Example").getOrCreate()// Parquetval df = spark.read.parquet("path/to/parquet/file")// JSON val df = spark.read.json("path/to/json/file")// CSVval df = spark.read.option("header", "true").csv("path/to/csv/file")// JDBCval df = spark.read.format("jdbc").option("url", "jdbc:mysql://host:port/database").option("dbtable", "table").option("user", "username").option("password", "password").load()df.show()在 Spark 中,load 函数用于从外部数据源读取数据并创建 DataFrame,而 save 函数用于将 DataFrame 保存到外部数据源。
下面是从 Parquet 文件中读取数据并创建 DataFrame 的示例代码:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("Load and Save Example").getOrCreate()val df = spark.read.load("path/to/parquet/file")df.show()下面是将 DataFrame 保存到 Parquet 文件的示例代码:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("Load and Save Example").getOrCreate()import spark.implicits._val df = Seq(("Alice", 25),("Bob", 30),("Charlie", 35)).toDF("name", "age")df.write.save("path/to/parquet/file")Spark SQL 提供了丰富的内置函数,包括数学函数、字符串函数、日期时间函数、聚合函数等。你可以在 Spark SQL 的官方文档中查看所有可用的内置函数。
此外,Spark SQL 还支持自定义函数(User-Defined Function,UDF),可以让用户编写自己的函数并在查询中使用。
下面是一个使用 SQL 语法编写自定义函数的示例代码:
import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.functions.udfval spark = SparkSession.builder.appName("UDF Example").getOrCreate()import spark.implicits._val df = Seq(("Alice", 25),("Bob", 30),("Charlie", 35)).toDF("name", "age")df.createOrReplaceTempView("people")val square = udf((x: Int) => x * x)spark.udf.register("square", square)spark.sql("SELECT name, square(age) FROM people").show()在这个示例中,我们首先定义了一个名为 square 的自定义函数,它接受一个整数参数并返回它的平方。然后,我们使用 createOrReplaceTempView 方法创建一个临时视图,并使用 udf.register 方法注册自定义函数。最后,我们使用 spark.sql 方法执行 SQL 查询,并在查询中调用自定义函数。
DataSet 是 Spark 1.6 版本中引入的一种新的数据结构,它提供了 RDD 的强类型和 DataFrame 的查询优化能力。
在 Scala 中,可以通过以下几种方式创建 DataSet:
DataSet 和 DataFrame 都是 Spark 中用于处理结构化数据的数据结构。它们都提供了丰富的操作,包括筛选、聚合、分组、排序等。
它们之间的主要区别在于类型安全性。DataFrame 是一种弱类型的数据结构,它的列只有在运行时才能确定类型。这意味着,在编译时无法检测到类型错误,只有在运行时才会抛出异常。
而 DataSet 是一种强类型的数据结构,它的类型在编译时就已经确定。这意味着,如果你试图对一个不存在的列进行操作,或者对一个列进行错误的类型转换,编译器就会报错。
此外,DataSet 还提供了一些额外的操作,例如 map、flatMap、reduce 等。
RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换。
可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可。
这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型在DataFrame需要针对各个字段处理时极为方便。
注意:
在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDF、toDS无法使用。
Spark Streaming 的工作原理是将实时数据流拆分为小批量数据,并使用 Spark 引擎对这些小批量数据进行处理。这种微批处理(Micro-Batch Processing)的方式使得 Spark Streaming 能够以近乎实时的延迟处理大规模的数据流。
下面是一个简单的 Spark Streaming 示例代码:
import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}val conf = new SparkConf().setAppName("Spark Streaming Example")val ssc = new StreamingContext(conf, Seconds(1))val lines = ssc.socketTextStream("localhost", 9999)val words = lines.flatMap(_.split(" "))val pairs = words.map(word => (word, 1))val wordCounts = pairs.reduceByKey(_ + _)wordCounts.print()ssc.start()ssc.awaitTermination()我们首先创建了一个 StreamingContext 对象,并指定了批处理间隔为 1 秒。然后,我们使用 socketTextStream 方法从套接字源创建了一个 DStream。接下来,我们对 DStream 进行了一系列操作,包括 flatMap、map 和 reduceByKey。最后,我们使用 print 方法打印出单词计数的结果。
Spark Streaming 作为一种实时流处理框架,具有以下优点:
但是,Spark Streaming 也有一些缺点:
DStream(离散化流)是 Spark Streaming 中用于表示实时数据流的一种抽象。它由一系列连续的 RDD 组成,每个 RDD 包含一段时间内收集到的数据。
在 Spark Streaming 中,可以通过以下几种方式创建 DStream:
总结: 简单来说 DStream 就是对 RDD 的封装,你对 DStream 进行操作,就是对 RDD 进行操作。对于 DataFrame/DataSet/DStream 来说本质上都可以理解成 RDD。
在 Spark Streaming 中,窗口函数用于对 DStream 中的数据进行窗口化处理。它允许你对一段时间内的数据进行聚合操作。
Spark Streaming 提供了多种窗口函数,包括:
下面是一个使用窗口函数的示例代码:
import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}val conf = new SparkConf().setAppName("Window Example")val ssc = new StreamingContext(conf, Seconds(1))val lines = ssc.socketTextStream("localhost", 9999)val words = lines.flatMap(_.split(" "))val pairs = words.map(word => (word, 1))val wordCounts = pairs.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(30), Seconds(10))wordCounts.print()ssc.start()ssc.awaitTermination()在这个示例中,我们首先创建了一个 DStream,并对其进行了一系列转换操作。然后,我们使用 reduceByKeyAndWindow 函数对 DStream 进行窗口化处理,指定了窗口大小为 30 秒,滑动间隔为 10 秒。最后,我们使用 print 方法打印出单词计数的结果。
Spark Streaming允许DStream的数据输出到外部系统,如数据库或文件系统,输出的数据可以被外部系统所使用,该操作类似于RDD的输出操作。Spark Streaming支持以下输出操作:
Structured Streaming 是 Spark 2.0 版本中引入的一种新的流处理引擎。它基于 Spark SQL 引擎,提供了一种声明式的 API 来处理结构化数据流。
与 Spark Streaming 相比,Structured Streaming 具有以下优点:
下面是一个简单的 Structured Streaming 示例代码:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()import spark.implicits._val words = lines.as[String].flatMap(_.split(" "))val wordCounts = words.groupBy("value").count()val query = wordCounts.writeStream.outputMode("complete").format("console").start()query.awaitTermination()在这个示例中,我们首先创建了一个 SparkSession 对象。然后,我们使用 readStream 方法从套接字源创建了一个 DataFrame。接下来,我们对 DataFrame 进行了一系列操作,包括 flatMap、groupBy 和 count。最后,我们使用 writeStream 方法将结果输出到控制台。
Structured Streaming 同样支持 DSL 和 SQL 语法。
DSL 语法:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()import spark.implicits._val words = lines.as[String].flatMap(_.split(" "))val wordCounts = words.groupBy("value").count()val query = wordCounts.writeStream.outputMode("complete").format("console").start()query.awaitTermination()SQL 语法:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()lines.createOrReplaceTempView("lines")val wordCounts = spark.sql("""|SELECT value, COUNT(*) as count|FROM (| SELECT explode(split(value, ' ')) as value| FROM lines|)|GROUP BY value""".stripMargin)val query = wordCounts.writeStream.outputMode("complete").format("console").start()query.awaitTermination()Structured Streaming 支持多种输入源,包括文件源(如文本文件、Parquet 文件、JSON 文件等)、Kafka、Socket 等。下面是一个使用 Scala 语言从 Kafka 中读取数据的例子:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()// 订阅一个主题val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "topic1").load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]Structured Streaming 支持多种输出方式,包括控制台输出、内存输出、文件输出、数据源输出等。下面是将数据写入到 Parquet 文件中的例子:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()// 从 socket 中读取数据val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()// 将数据写入到 Parquet 文件中lines.writeStream.format("parquet").option("path", "path/to/output/dir").option("checkpointLocation", "path/to/checkpoint/dir").start()每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。
Output mode 指定了数据写入输出接收器的方式。Structured Streaming 支持以下三种 output mode:
| Append | 只将流 DataFrame/Dataset 中的新行写入接收器。 |
| Complete | 每当有更新时,将流 DataFrame/Dataset 中的所有行写入接收器。 |
| Update | 每当有更新时,只将流 DataFrame/Dataset 中更新的行写入接收器。 |
Output sink 指定了数据写入的位置。Structured Streaming 支持多种输出接收器,包括文件接收器、Kafka 接收器、Foreach 接收器、控制台接收器和内存接收器等。下面是一些使用 Scala 语言将数据写入到不同输出接收器中的例子:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()// 从 socket 中读取数据val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()// 将数据写入到 Parquet 文件中lines.writeStream.format("parquet").option("path", "path/to/output/dir").option("checkpointLocation", "path/to/checkpoint/dir").start()// 将数据写入到 Kafka 中//selectExpr 是一个 DataFrame 的转换操作,它允许你使用 SQL 表达式来选择 DataFrame 中的列。//selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 表示选择 key 和 value 列,并将它们的类型转换为字符串类型。//这是因为 Kafka 接收器要求数据必须是字符串类型或二进制类型。lines.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("topic", "topic1").start()// 将数据写入到控制台中lines.writeStream.format("console").start()// 将数据写入到内存中lines.writeStream.format("memory").queryName("tableName").start()下面是用Structured Streaming实现PV,UV统计的例子,我们来感受实战下:
import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.functions._object PVUVExample {def main(args: Array[String]): Unit = {val spark = SparkSession.builder.appName("PVUVExample").getOrCreate()import spark.implicits._// 假设我们有一个包含用户ID和访问的URL的输入流val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()val data = lines.as[String].map(line => {val parts = line.split(",")(parts(0), parts(1))}).toDF("user", "url")// 计算PVval pv = data.groupBy("url").count().withColumnRenamed("count", "pv")val pvQuery = pv.writeStream.outputMode("complete").format("console").start()// 计算UVval uv = data.dropDuplicates().groupBy("url").count().withColumnRenamed("count", "uv")val uvQuery = uv.writeStream.outputMode("complete").format("console").start()pvQuery.awaitTermination()uvQuery.awaitTermination()}}这段代码演示了如何使用Structured Streaming对数据进行PV和UV统计。它首先从一个socket源读取数据,然后使用groupBy和count对数据进行PV统计,最后使用dropDuplicates、groupBy和count对数据进行UV统计。
假设我们在本地启动了一个socket服务器,并向其发送以下数据:
user1,http://example.com/page1user2,http://example.com/page1user1,http://example.com/page2user3,http://example.com/page1user2,http://example.com/page2user3,http://example.com/page2那么程序将输出以下结果:
-------------------------------------------Batch: 0-------------------------------------------+--------------------+---+| url| pv|+--------------------+---+|http://example.co...| 3||http://example.co...| 3|+--------------------+---+-------------------------------------------Batch: 0-------------------------------------------+--------------------+---+| url| uv|+--------------------+---+|http://example.co...| 2||http://example.co...| 3|+--------------------+---+总之,Spark是一个强大的大数据处理框架,它具有高性能、易用性和灵活性等优点。希望本文能够帮助你入门Spark,并在实际应用中发挥它的强大功能。如果你想深入学习Spark,可以参考官方文档和相关书籍,也可以加入Spark社区,与其他开发人员交流经验。