Skip to main content

Apache Spark – Best Practices

 18 January 2019

From: https://www.bi4all.pt/en/news/en-blog/apache-spark-best-practices/


Apache Spark – Best Practices

1.1. Introduction

Apache Spark is a Big Data tool which objective is to process large datasets in a parallel and distributed way. It is an extension of the already known programming model from Apache Hadoop – MapReduce – that facilitates the development of processing applications of large data volumes. Spark reveals much superior performance comparing to Hadoop, as in some cases it reaches a performance of almost 100x bigger.

Another advantage is that all components work integrated within the same framework, like Spark Streaming, Spark SQL and GraphX, differently of Hadoop where it is required to use tools that integrate with it but that are distributed separately like the Apache Hive. Another important aspect is that Spark can be programmed in four different languages: Java, Scala, Python and R.

Spark has several components for different types of processing, all built on Spark Core, which is the component that offers the basic functions for the processing functions like map, reduce, filter and collect:

  • Spark Streaming, for processing in real-time
  • GraphX, which performs processing on graphs
  • SparkSQL to use SQL in queries and processing the data on Spark
  • MLlib, which is a machine learning library, with different algorithms for several activities like clustering

 

Apache Spark Best Practices

Figure 1. Apache Spark components

 

1.2. Spark Architecture

In this section, it will be explained the primary functionalities of Spark Core. First, it will be shown the applications architecture and then the basic concepts on the programming model for data dataset processing. Spark application architecture is constituted by three major parts:

  • Driver Program is the major application that manages the creation and the one that executes the processing defined by the programmers;
  • Cluster Manager is an optional component, which is only necessary if Spark is executed in a distributed way. It is responsible for administering the machines that will be used as workers;
  • The Workers are the ones who will execute the tasks sent by the Driver Program. If Spark is executed in local, the machine will have both roles of Driver Program and Worker. Fig.2 shows the Spark architecture and its major components.

Apache Spark Best Practices

Figure 2. Spark Architecture

Apart from the architecture, it is important to know the principal components of the programming model from Spark. There are three fundamental concepts that will be used in all developed applications:

  • Resilient Distributed Datasets (RDD): They abstract a distributed dataset in the cluster, usually executed in the primary memory. These can be stored in traditional archiving systems in HDFS (Hadoop Distributed File System) and some in NoSQL databases like Cassandra and HBase. RDDs are the primary objects in the programming model of Spark because it is where the data is processed.
  • Operations: They represent transformations or actions that are made within a RDD. A Spark program is normally defined like a sequence of transformations or actions that are performed in a dataset.
  • Spark Context: Context is the object that connects Spark to the program being developed. It can be accessed as a variable in a program that uses its resources.

 

1.3. Tuning and best practices

Because of the in-memory nature of most Spark computations, Spark programs can be bottlenecked by any resource in the cluster: CPU, network bandwidth, or memory. Most often, if the data fits in memory, the bottleneck is network bandwidth, but sometimes it is required to do some tuning. In this section, we will show some techniques for tuning Apache Spark for optimal efficiency:

1.3.1. Do not collect large RDDs

A memory exception will be thrown if the dataset is too large to fit in memory when doing a RDD.collect(). Functions like take or takeSample are sufficient to get only a certain number of elements instead.

1.3.2. Do not use count() when you do not need to return the exact number of rows

Rather than return the exact number of rows in the RDD you can check if it is empty with a simple if(take(1).length == 0).

1.3.3. Avoid groupbykey on large datasets

There are two functions: reduceByKey and groupByKey and both will produce the same results. However, the latter will transfer the entire dataset across the network, while the former will compute local sums for each key in each partition and combine those local sums into larger sums after shuffling.

Below is a diagram to understand what happens with reduceByKey. There is more than one pair on the same machine with the same key being combined before the data is shuffled.

Apache Spark Best Practices

On the other hand, when calling groupByKey – all the key-value pairs are shuffled around. This is many unnecessary data being transferred over the network.

To determine which machine to shuffle a pair to, Spark calls a partitioning function on the key of the pair. Spark spills data to disk when there is more data shuffled onto a single executor machine than can fit in memory. However, it flushes out the data to disk one key at a time – so if a single key has more key-value pairs than can fit in memory, an out of memory exception occurs.

Apache Spark Best Practices

Shuffling can be a great bottleneck. Having many big HashSet’s (according to your dataset) could also be a problem. However, it is more likely that you will have a large amount of ram than network latency, which results in faster reads/writes across distributed machines.

Here are more functions to prefer over groupByKey:

  • combineByKey can be used when you are combining elements but your return type differs from your input value type.
  • foldByKey merges the values for each key using an associative function and a neutral “zero value”.
1.3.4. Avoid the flatmap-join-groupby pattern

When two datasets are already grouped by key and you want to join them and keep them grouped, you can just use cogroup. That avoids all the overhead associated with unpacking and repacking the groups.

1.3.5. Use coalesce to repartition in decrease number of partition

Use coalesce function if you decrease the number of partition of the RDD instead of repartition. Coalesce is useful because it avoids a full shuffle, It uses existing partitions to minimize the amount of data that is shuffled.

1.3.6. Whent to use broadcast variable

Spark computes the task’s closure before running each tasks on the available executors. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD. If there is a huge array that is accessed from Spark Closures, e.g. some reference data, this array will be shipped to each spark node with closure. For instance, if we have 10 nodes cluster with 100 partitions (10 partitions per node), this Array will be distributed at least 100 times (10 times to each node). If broadcast is used it will be distributed once per node using efficient P2P protocol. Once the value is broadcasted to the nodes, it cannot be changed to make sure each node have the exact same copy of data. The modified value might be sent to another node later that would give unexpected results.

1.3.7. Joining a large and a small RDD

If the small RDD is small enough to fit into the memory of each worker we can turn it into a broadcast variable and turn the entire operation into a so called map side join for the larger RDD and this way, the larger RDD does not need to be shuffled at all. This can easily happen if the smaller RDD is a dimension table.

1.3.8. Joining a large and a medium size RDD

If the medium size RDD does not fit fully into memory but its key set does, it is possible to exploit this. As a join will discard all elements of the larger RDD that do not have a matching partner in the medium size RDD, it can be used the medium key set to do this before the shuffle. If there is a significant amount of entries that gets discarded this way, the resulting shuffle will need to transfer a lot less data. It is important to note that the efficiency gain here depends on the filter operation actually reducing the size of the larger RDD.

1.3.9. Use the right level of parallelism

Unless the level of parallelism for each operation is high enough, clusters will not be fully utilized. Spark automatically sets the number of partitions of an input file according to its size and for distributed shuffles. Spark creates one partition for each block of the file in HDFS with 64MB by default. When creating a RDD it is possible to pass a second argument as a number of partitions, e.g.:

val rdd= sc.textFile(“file.txt”,5)

The above statement will create a RDD of textFile with 5 partitions. The RDD should be created with the number             of partitions equal to the number of cores in the cluster in order to all partitions be processed as parallel and resources are also used equally.

DataFrame creates a number of partitions equal to spark.sql.shuffle.partitions parameter. spark.sql.shuffle.partitions’s default value is 200.

1.3.10. How to estimate the number of partitions, executor’s and driver’s parameters (yarn cluster mode)

yarn.nodemanager.resource.memory-mb = ((Node’s Ram GB – 2 GB) * 1024) MB

Total Number of Node’s Core = yarn.nodemanager.resource.cpu-vcores

> Executor’s parameters (Worker Node):

  • Executor (VM) x Node = ((Número total de executores do Node) / 5) – 1

5 is the upper bound for cores per executor because more than 5 cores per executor can degrade HDFS I/O throughput

If the total number of Node’s core is less than or equal to 8 we divide It by 2

If the total number of Node’s core is equal to 1 the Executor x Node is equal to 1

  • numExecutors (Number of executors to launch for this session) = number of Nodes * Executor (VM) x Node

The Driver is included in executors.

  • executorCores (Number of cores to use for each executor) = (total number of Node’s core – 5 ) / Executor x Node
  • executorMemory (Amount of memory to use per executor process) = (yarn.nodemanager.resource.memory-mb – 1024) / (Executor (VM) x Node + 1)

For the executorMemory the memory allocation is based on the algorithm:

Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction, where memoryFraction = spark.storage.memoryFraction and safetyFraction = spark.storage.safetyFraction.

The default values of spark.storage.memoryFraction and spark.storage.safetyFraction are respectively 0.6 and 0.9 so the real executorMemory is:

executorMemory = ((yarn.nodemanager.resource.memory-mb – 1024) / (Executor (VM) x Node +1)) * memoryFraction * safetyFraction.

> Driver’s parameters (Application Master Node):

  • driverCores = executorCores
  • driverMemory = executorMemory

Consider the following example:

3 Worker nodes and one Application Master Node each with 16 vCPUs, 52 GB memory

yarn.nodemanager.resource.memory-mb = (52 – 2) * 1024 = 51200 MB

yarn.scheduler.maximum-allocation-mb = 20830 MB (must be greater than executorMemory)

> Executor’s params (Worker Node):

  • Executor x Node = (16) / 5 = 2
  • numExecutors = 2 * 4 = 8
  • executorCores = (16 – 5) / 2 = 5
  • executorMemory = ((51200 – 1024) / 3) * 0.6 * 0.9 = 16725,33 MB * 0.6 * 0.9 = 9031,68 MB

> Driver’s params (Application Master Node):

  • driverCores = 5
  • driverMemory = 16725,33 MB * 0.6 * 0.9 = 9031,68 MB
João Gaspar            
Consultant  

 

 
André Caetano       
Associate Consultant     

Comments

Popular posts from this blog

Install Gradle on Windows

First, please make sure you have PowerShell installed on your Windows machine. And run the following commands: Set-ExecutionPolicy RemoteSigned -scope CurrentUser iex (new-object net.webclient).downloadstring('https://get.scoop.sh') scoop install gradle And then you can run   gradle --version if you see something like: Build time:   2017-04-10 13:37:25 UTC Revision:     b762622a185d59ce0cfc9cbc6ab5dd22469e18a6 Groovy:       2.4.10 Ant:          Apache Ant(TM) version 1.9.6 compiled on June 29 2015 JVM:          1.8.0_131 (Oracle Corporation 25.131-b11) OS:           Windows 10 10.0 amd64 This means the Gradle has been installed on your computer successfully.

Trojan脚本安装

  前言 很多小伙伴找Trojan脚本也是够了,到处都是。特别是一些新手小伙伴,根本不晓得应该用哪一个脚本,今天作者为大家整理出了常用一些Trojan脚本,并且详细的叙述一些脚本的功能和安装步骤! Trojan搭建失败?作者收集了市面上稳定性最高的一些脚本供大家自行选择。 通用的准备工作 1、VPS一台,看脚本的功能,内存需求不同 ( 购买VPS ) 2、域名一个,并做好解析 ( 不会请点击 ) 3、提前安装好BBR加速 ( 快速直达BBRPLUS脚本 ) Trojan客户端 正在制作 Trojan脚本合集 一、Jrohy的一键Trojan面板脚本 (推荐) 特点 在线web页面和命令行两种方式管理trojan多用户 启动 / 停止 / 重启 trojan 服务端 支持流量统计和流量限制 命令行模式管理, 支持命令补全 集成acme.sh证书申请 生成客户端配置文件 支持trojan://分享链接和二维码分享(二维码仅限web页面) 本脚本详细博文: 点击访问 #安装/更新 source <( curl - sL https : //git.io/trojan-install) #卸载 source <( curl - sL https : //git.io/trojan-install) --remove 二、官方Trojan脚本 特点 原汁原味的官方味道,免得说脚本挖了你的矿。 官方GitHub地址: 点击访问 脚本如下: apt install sudo #debian yum install sudo #contos sudo bash - c "$(curl -fsSL https://raw.githubusercontent.com/trojan-gfw/trojan-quickstart/master/trojan-quickstart.sh)" 注意事项: 官方脚本需要自己设置服务器配置,包括密码、证书等,设置路径为: /usr/local/etc/trojan.config.json 三、Trojan全智能一键安装脚本 特点 自动获取Trojan官方最新版本进行部署,全智能化。 本脚本详细博文: 点击访问 yum - y install wget ##ContOS Yum 安装 wget apt - ge...

阿里云 ECS linux下搭建openvpn服务器

  linux下搭建openvpn服务器 一、下载服务端安装脚本并获取权限 wget https://git.io/vpn -O openvpn-install.sh chmod +x openvpn-install.sh ./openvpn-install.sh 输入自己的配置信息 ip:0.0.0.0 protocol:tcp port:1194 DNS:current system resolvers Client:客户端的名称(也是客户端文件名) 创建成功后 会在当前目录下生成一个客户端需要使用的 ovpn后缀 文件,用ftp工具获取到本地 二、本地配置 下载客户端windows版本 链接:  客户端下载地址 如果在本地无法下载 可以在服务器下载后再通过ftp的方式拉到本地 wget https: / /swupdate.openvpn.org/community /releases/openvpn -install- 2.4 . 9 -I601-Win1 0 .exe 客户端安装和windows的软件基本一样,一直下一步就可以了。 安装完成后 桌面会自动创建一个Gui的启动器 安装好客户端后,将我们从服务器获取的文件放到客户端安装目录下的config目录下 开启Gui界面,在任务栏会多出一个类似网络连接的图标(如果没有可能在任务栏的隐藏界面或用管理员启动) 右键可以看到当前config目录下我们放置所有可以使用的客户端 选择任意链接 右键连接 停止服务 systemctl stop openvpn@- server 启动服务 systemctl start openvpn@- server 在服务端生成配置 ./openvpn-install.sh # Add a new client 选择此项就会在当前目录下生成 ovpn 配置文件 服务端配置地址 cd /etc/openvpn/server vi server.conf Linux整体比较简单 参考资料: https://blog.csdn.net/chastera/article/details/108183207