Spark环境搭建(Windows)

简介

为了在有限的资源上学习大数据处理与分析技术,借鉴Linux以及部分网上的教程,在Windows10平台搭建Spark环境。本文将简单记录搭建流程以及其中遇到的坑。

官网

https://spark.apache.org/

https://spark.apache.org/docs/latest/index.html

Spark概述

一种基于内存的快速、通用、可扩展的大数据分析计算引攀。

Apache Spark 是一个用于大规模数据处理的统一分析引擎。它提供了 Java、Scala、Python 和 R 中的高级 API,以及支持通用执行图的优化引擎。它还支持一组丰富的更高级别的工具,包括用于 SQL 和结构化数据处理的 Spark SQL、用于 pandas 工作负载的 Spark 上的 Pandas API(建议直接使用 PySpark )、用于机器学习的 MLlib、用于图形处理的 GraphX、用于增量计算和流处理的 Structured Streaming

高级工具

https://spark.apache.org/sql/

https://spark.apache.org/streaming/

https://spark.apache.org/mllib/

https://spark.apache.org/graphx/

层次结构

应 用 层 :Spark SQL、Spark Streaming、MLlib、GraphX

处理引擎:Spark Core

资源管理:Standalone、Yarm、K8s

数据存储:HDFS、Hive、Hbase、Carbon、Kafka

特点

Spark是通过借鉴Hadoop MapReduce的特点发展而来的,相对于传统的MapReduce任务,具有以下几个特点:

速度快:基于内存计算,计算速度最快可以达到MapReduce的100倍

易用性高:Spark允许Java、Scala、Python、R等不同语言的开发者在自己熟悉的语言下进行工作

通用性强:提供统一的解决方案,适用于批处理、流处理、机器学习和图运算等多种特性

无处不在:Spark可以独立运行,也可以运行在Yarn、K8s、等调度框架中

核心概念

  • RDD:弹性分布式数据集,一个只读、可分区的分布式数据集,是Spark最核心的特性,所有的Spark应用的都是基于RDD来完成的
  • Transform算子:完成RDD转换的操作,将旧的RDD转换为新的RDD,常用的算子有map、filter、join、union等,由于RDD的只读特性,每个Transform操作都会产生一个新的RDD
  • Action算子:处理RDD结果的算子,常用的有count、collect、foreach、save等操作,每个action都会生成一个Job

Spark应用其实是对RDD的处理:首先将数据转换为RDD;然后利用Transform算子转换RDD、最后使用Action算子处理结果RDD

应用结构

用户提价Spark应用后,Spark会将其解析成APP-Job-Stage-Task的内部结构,并执行

  • Application:与用于提交的任务对应(应用程序)。
  • Job:与功能对应,一个Job可以完成一个功能,每个Application由一个或多个Job组成。(简单理解为当执行RDD的一个Action时就会生成一个Job)
  • Stage:根据Job中数据流依赖关系拆分而成,一个Job可以拆分成一个或多个Stage。(依据RDD间的Dependency划分stage;一个Job会被切分为一个或多个Stage;由多个任务组成的并行计算)
  • Task:Task是Spark计算的最小单元,一个Stage可以由一个或多个Task组成。(每个Stage由多个MapTask完成;若经过shuffle后RDD的Partition改变,则后继Stage的MapTask数会改变;Task分为ShuffleMapTask和ResultTask)

部署

集群模式概述

https://spark.apache.org/docs/latest/cluster-overview.html

词汇表: https://spark.apache.org/docs/latest/cluster-overview.html#glossary

Spark目前支持三种集群管理器:

更多

Spark配置

https://spark.apache.org/docs/latest/configuration.html

监控

https://spark.apache.org/docs/latest/monitoring.html

常用JavaAPI

https://spark.apache.org/docs/latest/sql-programming-guide.html SparkSQL指南

https://spark.apache.org/docs/latest/api/java/index.html

SparkSession

SparkSession.Builder

RuntimeConfig

Dataset

DataFrameWriter

DataFrameReader

functions

Column

RDD、Dataframe、Dataset区别

安装Java

安装路径不要有空格

安装Hadoop

下载: https://hadoop.apache.org/releases.html

解压到某个不含空格的路径下,如 D:\Program\hadoop

添加环境变量 HADOOP_HOME,值为安装路径 D:\Program\hadoop

添加到 Path 路径:%HADOOP_HOME%\bin;%HADOOP_HOME%\sbin

进入Hadoop的配置目录 etc\hadoop,打开文件 hadoop-env.cmd,修改Java安装路径 set JAVA_HOME=D:\Program\Java\64\jdk1.8.0_191

下载对应版本的 winutils,把下载到的 bin 文件夹覆盖到Hadoop安装目录的bin文件夹,确保其中含有 winutils.exe 文件

​ Github:https://github.com/cdarlint/winutils

略 – 新建 D:\Program\hive 文件夹,命令行导航到 Hadoop 的 bin 目录,执行授权操作 winutils.exe chmod -R 777 D:\Program\hive

最后在命令行输入 hadoop version 测试是否安装成功

1
2
3
4
5
6
7
C:\Users>hadoop version
Hadoop 3.3.5
Source code repository https://github.com/apache/hadoop.git -r 706d88266abcee09ed78fbaa0ad5f74d818ab0e9
Compiled by stevel on 2023-03-15T15:56Z
Compiled with protoc 3.7.1
From source with checksum 6bbd9afcf4838a0eb12a5f189e9bd7
This command was run using /D:/Program/hadoop/share/hadoop/common/hadoop-common-3.3.5.jar

安装Spark

Java项目:“local[*]”本地运行时,只需安装Hadoop;要在Spark独立群集“Spark://master:7077”上运行时,需要搭建Spark单机模式或集群模式并运行。

下载: https://spark.apache.org/downloads.html

解压到某个不含空格的路径下,如 D:\Program\spark

添加环境变量 SPARK_HOME,值为安装路径 D:\Program\spark

添加到 Path 路径:%SPARK_HOME%\bin;%SPARK_HOME%\sbin

进入Spark的配置目录 conf,复制一个 log4j.properties.template 文件并命名为log4j.properties

同样在Spark的配置目录 conf,复制一个 spark-env.sh.template 文件并命名为 spark-env.sh,打开并增加一行代码 SPARK_LOCAL_IP = 127.0.0.1

验证Spark安装成功:

打开命令行,运行 spark-shell

此时进入 http://localhost:4040/ 可以看到Spark的Web界面

Java示例

spark安装包包含Java示例 D:\Program\spark\examples

Spark调优(参考)

主要性能瓶颈:Stage过长、Shuffler过多、数据分布不均衡、数据分片不合理

缓存调优

  • 避免创建重复的RDD
  • 尽可能复用同一个RDD
  • 合理使用RDD持久化(cache方法)

shuffler调优

​ 避免使用shuffler算子
​ 使用map-side预聚合的shuffler操作–>减少shuffler。例如GroupByKey和ReduceByKey优先使用后者

参数调优-资源参数spark2

https://blog.51cto.com/u_15127627/3694722

num-executors

参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。
参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。

executor-memory

参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。
参数调优建议:每个Executor进程的内存设置4G8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/31/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行。

executor-cores

参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。
参数调优建议:Executor的CPU core数量设置为24个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/31/2左右比较合适,也是避免影响其他同学的作业运行。

driver-memory

参数说明:该参数用于设置Driver进程的内存。
参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。

spark.default.parallelism

参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。
参数调优建议:Spark作业的默认task数量为5001000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的23倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。

spark.storage.memoryFraction

spark3:spark.memory.fraction、spark.memory.storageFraction

参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

spark.shuffle.memoryFraction(spark2)

参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

total-executor-cores

参数说明:Total cores for all executors.

其他调优

使用高性能的算子 mapPartitions/foreachPartitions

广播大变量

使用Kryo优化序列化性能