18 January 2019
From: https://www.bi4all.pt/en/news/en-blog/apache-spark-best-practices/
Apache Spark – Best Practices
1.1. Introduction
Apache Spark is a Big Data tool which objective is to process large datasets in a parallel and distributed way. It is an extension of the already known programming model from Apache Hadoop – MapReduce – that facilitates the development of processing applications of large data volumes. Spark reveals much superior performance comparing to Hadoop, as in some cases it reaches a performance of almost 100x bigger.
Another advantage is that all components work integrated within the same framework, like Spark Streaming, Spark SQL and GraphX, differently of Hadoop where it is required to use tools that integrate with it but that are distributed separately like the Apache Hive. Another important aspect is that Spark can be programmed in four different languages: Java, Scala, Python and R.
Spark has several components for different types of processing, all built on Spark Core, which is the component that offers the basic functions for the processing functions like map, reduce, filter and collect:
- Spark Streaming, for processing in real-time
- GraphX, which performs processing on graphs
- SparkSQL to use SQL in queries and processing the data on Spark
- MLlib, which is a machine learning library, with different algorithms for several activities like clustering
Figure 1. Apache Spark components
1.2. Spark Architecture
In this section, it will be explained the primary functionalities of Spark Core. First, it will be shown the applications architecture and then the basic concepts on the programming model for data dataset processing. Spark application architecture is constituted by three major parts:
- Driver Program is the major application that manages the creation and the one that executes the processing defined by the programmers;
- Cluster Manager is an optional component, which is only necessary if Spark is executed in a distributed way. It is responsible for administering the machines that will be used as workers;
- The Workers are the ones who will execute the tasks sent by the Driver Program. If Spark is executed in local, the machine will have both roles of Driver Program and Worker. Fig.2 shows the Spark architecture and its major components.
Figure 2. Spark Architecture
Apart from the architecture, it is important to know the principal components of the programming model from Spark. There are three fundamental concepts that will be used in all developed applications:
- Resilient Distributed Datasets (RDD): They abstract a distributed dataset in the cluster, usually executed in the primary memory. These can be stored in traditional archiving systems in HDFS (Hadoop Distributed File System) and some in NoSQL databases like Cassandra and HBase. RDDs are the primary objects in the programming model of Spark because it is where the data is processed.
- Operations: They represent transformations or actions that are made within a RDD. A Spark program is normally defined like a sequence of transformations or actions that are performed in a dataset.
- Spark Context: Context is the object that connects Spark to the program being developed. It can be accessed as a variable in a program that uses its resources.
1.3. Tuning and best practices
Because of the in-memory nature of most Spark computations, Spark programs can be bottlenecked by any resource in the cluster: CPU, network bandwidth, or memory. Most often, if the data fits in memory, the bottleneck is network bandwidth, but sometimes it is required to do some tuning. In this section, we will show some techniques for tuning Apache Spark for optimal efficiency:
1.3.1. Do not collect large RDDs
A memory exception will be thrown if the dataset is too large to fit in memory when doing a RDD.collect(). Functions like take or takeSample are sufficient to get only a certain number of elements instead.
1.3.2. Do not use count() when you do not need to return the exact number of rows
Rather than return the exact number of rows in the RDD you can check if it is empty with a simple if(take(1).length == 0).
1.3.3. Avoid groupbykey on large datasets
There are two functions: reduceByKey and groupByKey and both will produce the same results. However, the latter will transfer the entire dataset across the network, while the former will compute local sums for each key in each partition and combine those local sums into larger sums after shuffling.
Below is a diagram to understand what happens with reduceByKey. There is more than one pair on the same machine with the same key being combined before the data is shuffled.
On the other hand, when calling groupByKey – all the key-value pairs are shuffled around. This is many unnecessary data being transferred over the network.
To determine which machine to shuffle a pair to, Spark calls a partitioning function on the key of the pair. Spark spills data to disk when there is more data shuffled onto a single executor machine than can fit in memory. However, it flushes out the data to disk one key at a time – so if a single key has more key-value pairs than can fit in memory, an out of memory exception occurs.
Shuffling can be a great bottleneck. Having many big HashSet’s (according to your dataset) could also be a problem. However, it is more likely that you will have a large amount of ram than network latency, which results in faster reads/writes across distributed machines.
Here are more functions to prefer over groupByKey:
- combineByKey can be used when you are combining elements but your return type differs from your input value type.
- foldByKey merges the values for each key using an associative function and a neutral “zero value”.
1.3.4. Avoid the flatmap-join-groupby pattern
When two datasets are already grouped by key and you want to join them and keep them grouped, you can just use cogroup. That avoids all the overhead associated with unpacking and repacking the groups.
1.3.5. Use coalesce to repartition in decrease number of partition
Use coalesce function if you decrease the number of partition of the RDD instead of repartition. Coalesce is useful because it avoids a full shuffle, It uses existing partitions to minimize the amount of data that is shuffled.
1.3.6. Whent to use broadcast variable
Spark computes the task’s closure before running each tasks on the available executors. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD. If there is a huge array that is accessed from Spark Closures, e.g. some reference data, this array will be shipped to each spark node with closure. For instance, if we have 10 nodes cluster with 100 partitions (10 partitions per node), this Array will be distributed at least 100 times (10 times to each node). If broadcast is used it will be distributed once per node using efficient P2P protocol. Once the value is broadcasted to the nodes, it cannot be changed to make sure each node have the exact same copy of data. The modified value might be sent to another node later that would give unexpected results.
1.3.7. Joining a large and a small RDD
If the small RDD is small enough to fit into the memory of each worker we can turn it into a broadcast variable and turn the entire operation into a so called map side join for the larger RDD and this way, the larger RDD does not need to be shuffled at all. This can easily happen if the smaller RDD is a dimension table.
1.3.8. Joining a large and a medium size RDD
If the medium size RDD does not fit fully into memory but its key set does, it is possible to exploit this. As a join will discard all elements of the larger RDD that do not have a matching partner in the medium size RDD, it can be used the medium key set to do this before the shuffle. If there is a significant amount of entries that gets discarded this way, the resulting shuffle will need to transfer a lot less data. It is important to note that the efficiency gain here depends on the filter operation actually reducing the size of the larger RDD.
1.3.9. Use the right level of parallelism
Unless the level of parallelism for each operation is high enough, clusters will not be fully utilized. Spark automatically sets the number of partitions of an input file according to its size and for distributed shuffles. Spark creates one partition for each block of the file in HDFS with 64MB by default. When creating a RDD it is possible to pass a second argument as a number of partitions, e.g.:
val rdd= sc.textFile(“file.txt”,5)
The above statement will create a RDD of textFile with 5 partitions. The RDD should be created with the number of partitions equal to the number of cores in the cluster in order to all partitions be processed as parallel and resources are also used equally.
DataFrame creates a number of partitions equal to spark.sql.shuffle.partitions parameter. spark.sql.shuffle.partitions’s default value is 200.
1.3.10. How to estimate the number of partitions, executor’s and driver’s parameters (yarn cluster mode)
yarn.nodemanager.resource.memory-mb = ((Node’s Ram GB – 2 GB) * 1024) MB
Total Number of Node’s Core = yarn.nodemanager.resource.cpu-vcores
> Executor’s parameters (Worker Node):
- Executor (VM) x Node = ((Número total de executores do Node) / 5) – 1
5 is the upper bound for cores per executor because more than 5 cores per executor can degrade HDFS I/O throughput
If the total number of Node’s core is less than or equal to 8 we divide It by 2
If the total number of Node’s core is equal to 1 the Executor x Node is equal to 1
- numExecutors (Number of executors to launch for this session) = number of Nodes * Executor (VM) x Node
The Driver is included in executors.
- executorCores (Number of cores to use for each executor) = (total number of Node’s core – 5 ) / Executor x Node
- executorMemory (Amount of memory to use per executor process) = (yarn.nodemanager.resource.memory-mb – 1024) / (Executor (VM) x Node + 1)
For the executorMemory the memory allocation is based on the algorithm:
Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction, where memoryFraction = spark.storage.memoryFraction and safetyFraction = spark.storage.safetyFraction.
The default values of spark.storage.memoryFraction and spark.storage.safetyFraction are respectively 0.6 and 0.9 so the real executorMemory is:
executorMemory = ((yarn.nodemanager.resource.memory-mb – 1024) / (Executor (VM) x Node +1)) * memoryFraction * safetyFraction.
> Driver’s parameters (Application Master Node):
- driverCores = executorCores
- driverMemory = executorMemory
Consider the following example:
3 Worker nodes and one Application Master Node each with 16 vCPUs, 52 GB memory
yarn.nodemanager.resource.memory-mb = (52 – 2) * 1024 = 51200 MB
yarn.scheduler.maximum-allocation-mb = 20830 MB (must be greater than executorMemory)
> Executor’s params (Worker Node):
- Executor x Node = (16) / 5 = 2
- numExecutors = 2 * 4 = 8
- executorCores = (16 – 5) / 2 = 5
- executorMemory = ((51200 – 1024) / 3) * 0.6 * 0.9 = 16725,33 MB * 0.6 * 0.9 = 9031,68 MB
> Driver’s params (Application Master Node):
- driverCores = 5
- driverMemory = 16725,33 MB * 0.6 * 0.9 = 9031,68 MB
1.4. References
João Gaspar
Consultant
Comments
Post a Comment