Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What are the basics of Spark performance optimization

2024-05-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article focuses on "what are the basics of Spark performance optimization". Interested friends may wish to take a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn the basics of Spark performance optimization.

# Development tuning

The first step in Spark performance optimization is to pay attention to and apply some basic principles of performance optimization in the process of developing Spark jobs. Development and tuning is to let you understand the following basic Spark development principles, including: RDD lineage design, reasonable use of operators, optimization of special operations, and so on. In the process of development, we should pay attention to the above principles all the time, and apply these principles flexibly to our own Spark assignments according to the specific business and actual application scenarios.

# # principle 1: avoid creating duplicate RDD

Generally speaking, when we develop a Spark job, we first create an initial RDD; based on a data source (such as a Hive table or HDFS file), then perform an operator operation on this RDD, then get the next RDD;, and so on, until we calculate the final result we need. In this process, multiple RDD will be strung together through different operator operations (such as map, reduce, etc.). This "RDD string" is RDD lineage, that is, "RDD consanguinity chain".

We should pay attention in the development process: for the same data, only one RDD should be created, not multiple RDD should be created to represent the same data.

When some Spark beginners start to develop Spark jobs, or experienced engineers develop RDD lineage's extremely lengthy Spark jobs, they may forget that they have previously created a RDD for a piece of data, resulting in the creation of multiple RDD for the same data. This means that our Spark job will perform repeated calculations to create multiple RDD representing the same data, which in turn increases the performance overhead of the job.

/ / A map operation is required for the HDFS file named "hello.txt" and another reduce operation is required. That is, you need to perform two operator operations on one piece of data. / / wrong practice: create multiple RDD when performing multiple operator operations on the same piece of data. / / the textFile method is executed twice, two RDD are created for the same HDFS file, and then an operator operation is performed on each RDD. / / in this case, the Spark needs to load the contents of the hello.txt file twice from the HDFS and create two separate RDD; to load the HDFS file the second time and the performance overhead of creating the RDD is obviously wasted. Val rdd1 = sc.textFile ("hdfs://192.168.0.1:9000/hello.txt") rdd1.map (...) val rdd2 = sc.textFile ("hdfs://192.168.0.1:9000/hello.txt") rdd2.reduce (...) / / correct usage: when performing multiple operator operations on a piece of data, only one RDD is used. / / this is obviously much better than the previous one, because we only created one RDD for the same piece of data, and then performed multiple operator operations on that RDD. / / but note that the optimization has not been completed so far. Since rdd1 has been performed two operator operations, the second time the reduce operation is performed, the rdd1 data will be recalculated from the source again, so there will still be performance overhead of double calculation. / / to solve this problem thoroughly, we must combine "principle 3: persistence of RDD that is used many times" to ensure that a RDD is calculated only once when it is used multiple times. Val rdd1 = sc.textFile ("hdfs://192.168.0.1:9000/hello.txt") rdd1.map (...) rdd1.reduce (...)

# # principle 2: reuse the same RDD as much as possible

In addition to avoiding creating multiple RDD for the same piece of data during the development process, reuse one RDD as much as possible when performing operator operations on different data. For example, if one RDD's data format is of type key-value and the other is of a single value type, the value data of the two RDD are exactly the same. Then we can only use that RDD of type key-value at this time, because it already contains the data of another. For cases like this where multiple RDD data overlap or contain, we should reuse one RDD as much as possible, so that the number of RDD can be reduced as much as possible, thus minimizing the number of operator execution.

/ / wrong practice. / / there is a format of RDD, that is, rdd1. / / then, due to business needs, a map operation is performed on rdd1 and a rdd2 is created, and the data in rdd2 is only the value value in rdd1, that is, rdd2 is a subset of rdd1. JavaPairRDD rdd1 =... JavaRDD rdd2 = rdd1.map (...) / / performs different operator operations on rdd1 and rdd2, respectively. Rdd1.reduceByKey (...) rdd2.map (...) / / the right thing to do. / / in the above case, the only difference between rdd1 and rdd2 is that the data format is different. The data of rdd2 is a subset of rdd1, but two rdd are created and an operator operation is performed on both rdd. / / at this point, the operator operation will be performed one more time because the map operator is executed on the rdd1 to create the rdd2, thus increasing the performance overhead. / / in fact, the same RDD can be reused in this case. / / We can use rdd1 to do both reduceByKey and map operations. / / in the second map operation, only the tuple._2 of each data, that is, the value in rdd1, is used. JavaPairRDD rdd1 =... rdd1.reduceByKey (...) rdd1.map (tuple._2...) / / the second method significantly reduces the computational overhead of a rdd2 compared to the first method. / / but so far, the optimization is not over, we have performed two operator operations on rdd1, and rdd1 will actually be evaluated twice. / / therefore, it also needs to be used in conjunction with "principle 3: persistence of RDD that is used multiple times" to ensure that a RDD is calculated only once when it is used multiple times.

# # principle 3: persist RDD that is used many times

When you have done operator operations on a RDD many times in your Spark code, congratulations, you have implemented the first step of optimizing the Spark job, which is to reuse RDD as much as possible. On this basis, it is time for the second step of optimization, that is, to ensure that when multiple operator operations are performed on a RDD, the RDD itself is calculated only once.

The default principle in Spark for executing an operator multiple times for a RDD is this: every time you perform an operator operation on an RDD, you will recalculate from the source, calculate that RDD, and then perform your operator operation on the RDD. The performance of this approach is very poor.

So in this case, our advice is to persist RDD that is used many times. At this point, Spark will save the data in RDD to memory or disk according to your persistence strategy. Each time you perform an operator operation on this RDD, the persistent RDD data is extracted directly from memory or disk, and then the operator is executed without recalculating the RDD from the source and then performing the operator operation.

/ / if you want to persist a RDD, simply call cache () and persist () on the RDD. / / the right thing to do. The / / cache () method says that all the data in the RDD is attempted to be persisted to memory in a non-serialized way. / / when performing two more operator operations on rdd1, the rdd1 will be calculated from the source only when the map operator is executed for the first time. / / when the reduce operator is executed for the second time, the data is extracted directly from memory for calculation, and a rdd is not repeatedly calculated. The val rdd1 = sc.textFile ("hdfs://192.168.0.1:9000/hello.txt") .cache () rdd1.map (...) rdd1.reduce (...) / / persist () method indicates that the persistence level is manually selected and persisted in the specified manner. / / for example, StorageLevel.MEMORY_AND_DISK_SER says that when there is enough memory, it is preferred to persist to memory, and when there is not enough memory, it is persisted to disk files. / / and the _ SER suffix indicates that RDD data is saved by serialization, in which case each partition in RDD is serialized into a large byte array and then persisted to memory or disk. / / serialization can reduce the memory / disk footprint of persistent data, thereby preventing memory from being occupied too much by persistent data, resulting in frequent GC. Val rdd1 = sc.textFile ("hdfs://192.168.0.1:9000/hello.txt"). Steps (StorageLevel.MEMORY_AND_DISK_SER) rdd1.map (...) rdd1.reduce (...)

For the persist () method, we can choose different persistence levels according to different business scenarios.

# # principle 4: avoid using shuffle operators as much as possible

If possible, try to avoid using shuffle class operators. Because the most performance-consuming part of a Spark job is the shuffle process. The shuffle process, to put it simply, is to pull the same key distributed on multiple nodes in the cluster to the same node for operations such as aggregation or join. For example, operators such as reduceByKey and join will trigger shuffle operations.

During the shuffle process, the same key on each node is first written to the local disk file, and then other nodes need to pull the same key from the disk file on each node through network transfer. Moreover, when the same key is pulled to the same node for aggregation operation, there may be too much key processed on one node, resulting in insufficient memory and overwriting to the disk file. Therefore, in the shuffle process, there may be a large number of disk file read and write IO operations, as well as data network transmission operations. Disk IO and network data transmission are also the main reasons for the poor performance of shuffle.

Therefore, in our development process, to avoid as much as possible to avoid using reduceByKey, join, distinct, repartition and other shuffle operators, try to use the map class of non-shuffle operators. In this way, Spark jobs with no shuffle operations or only fewer shuffle operations can greatly reduce performance overhead.

Join code example for Broadcast and map

/ / traditional join operations will result in shuffle operations. / / because in both RDD, the same key needs to be pulled to a node through the network, and the join operation is performed by a task. The join operation of val rdd3 = rdd1.join (rdd2) / / Broadcast+map does not cause the shuffle operation. / / use Broadcast to use a RDD with a small amount of data as a broadcast variable. Val rdd2Data = rdd2.collect () val rdd2DataBroadcast = sc.broadcast (rdd2Data) / / in the rdd1.map operator, you can get all the data for rdd2 from rdd2DataBroadcast. / / then traverse, and if it is found that the key of a piece of data in rdd2 is the same as the key of the current data of rdd1, then it is determined that join can be carried out. / / at this point, you can splice the current data of rdd1 with the data that can be connected in rdd2 (String or Tuple) in any way you need. Val rdd3 = rdd1.map (rdd2DataBroadcast...) / / Note that the above operations are recommended only when the amount of data in rdd2 is relatively small (for example, a few hundred megabytes, or one or two gigabytes). / / because a copy of the full data of Executor resides in the memory of each rdd2.

# # principle 5: shuffle operations using map-side pre-aggregation

If you must use shuffle operations because of business needs and cannot be replaced by operators of the map class, try to use operators that can be pre-aggregated by map-side.

The so-called map-side preaggregation refers to an aggregation operation for the same key locally on each node, similar to the local combiner in MapReduce. After map-side pre-aggregation, there is only one same key locally for each node, because multiple identical key are aggregated. When other nodes pull the same key on all nodes, the amount of data that needs to be pulled is greatly reduced, thus reducing the disk IO and network transmission overhead. Generally speaking, where possible, it is recommended to use reduceByKey or aggregateByKey operators instead of groupByKey operators. Because both the reduceByKey and aggregateByKey operators preaggregate the same key locally to each node using user-defined functions. The groupByKey operator will not carry out pre-aggregation, and the full amount of data will be distributed and transmitted among the nodes of the cluster, so the performance is relatively poor.

For example, the following two pictures are typical examples of word counting based on reduceByKey and groupByKey, respectively. The first diagram is the schematic diagram of groupByKey, and you can see that when there is no local aggregation, all data is transferred between cluster nodes; the second diagram is the schematic diagram of reduceByKey, and you can see that the same key data locally on each node is pre-aggregated before being transferred to other nodes for global aggregation.

# # principle 6: use high-performance operators

In addition to the shuffle-related operators have optimization principles, other operators also have corresponding optimization principles.

Use reduceByKey/aggregateByKey instead of groupByKey

For details, see "principle 5: shuffle operations using map-side preaggregation". Using mapPartitions instead of normal map

MapPartitions class operators, a function call will handle all the data of a partition, rather than a function call to handle one, the performance will be relatively higher. But sometimes there is an OOM (memory overflow) problem when using mapPartitions. Because a single function call will dispose of all the data of a partition, if there is not enough memory, it is impossible to collect too many objects during garbage collection, and an OOM exception is likely to occur. So be careful when using this kind of operation!

Use foreachPartitions instead of foreach

The principle is similar to "using mapPartitions instead of map". It also processes all the data of one partition at a time, rather than one piece of data at a time. In practice, it is found that the operator of foreachPartitions class is very helpful to improve the performance. For example, in the foreach function, if all the data in RDD is written to MySQL, then if it is an ordinary foreach operator, it will be written piece by piece of data, and each function call may create a database connection. At this time, database connections will be created and destroyed frequently, and the performance is very low. However, if you use the foreachPartitions operator to process the data of one partition at a time, then for each partition, just create a database connection, and then perform a batch insert operation, at this time the performance is relatively high. In practice, it is found that the performance of MySQL can be improved by more than 30% for about 10, 000 pieces of data.

Coalesce operation after using filter

Usually, after the filter operator is executed on a RDD to filter out more data in the RDD (such as more than 30% of the data), it is recommended to use the coalesce operator to manually reduce the number of partition in the RDD and compress the data in the RDD into less partition. Because after the filter, a lot of data will be filtered out in each partition of the RDD. If the subsequent calculation is carried out as usual, the amount of data in the partition processed by each task is not very large, which is a bit of a waste of resources, and the more task processed at this time, the slower the speed may be. So reduce the number of partition with coalesce, compress the data in RDD into less partition, and you can process all the partition with less task. In some scenarios, it will be helpful to improve performance.

Using repartitionAndSortWithinPartitions instead of repartition and sort class to operate repartitionAndSortWithinPartitions is a recommended operator on the official website of Spark. It is officially recommended that if you need to sort after repartition re-partition, you should use repartitionAndSortWithinPartitions operator directly. Because this operator can sort while performing the shuffle operation of re-partition. Shuffle and sort operate at the same time, which may have higher performance than first shuffle and then sort.

# # principle 7: broadcasting large variables sometimes encounters scenarios where external variables need to be used in operator functions (especially large variables, such as large collections of more than 100m), so Spark's Broadcast feature should be used to improve performance.

When an external variable is used in an operator function, by default, Spark makes multiple copies of the variable and transmits it over the network to task, where each task has a copy of the variable. If the variables themselves are relatively large (such as 100m or even 1G), the performance overhead of a large number of copies of variables transmitted in the network, as well as the frequent GC caused by excessive memory consumption in the Executor of each node, will greatly affect performance.

Therefore, for the above cases, if you use a large external variable, it is recommended to use the broadcast function of Spark to broadcast the variable. The broadcast variable ensures that only one copy of the variable resides in the memory of each Executor, and that the copy of the variable in the Executor is shared when the task in the Executor executes. In this way, the number of copies of variables can be greatly reduced, thus reducing the performance overhead of network transmission, reducing the overhead of Executor memory, and reducing the frequency of GC.

/ / the following code uses external variables in the operator function. / / No special operations have been done at this time, and each task will have a copy of list1. Val list1 =... rdd1.map (list1...) / / the following code encapsulates list1 as a broadcast variable of type Broadcast. / / in the operator function, when you use broadcast variables, you will first determine whether there is a copy of the variable in the Executor memory where the current task resides. / / if there is one, use it directly; if not, remotely pull a copy from Driver or other Executor nodes and put it in local Executor memory. / / only one copy of the broadcast variable resides in each Executor memory. Val list1 =... val list1Broadcast = sc.broadcast (list1) rdd1.map (list1Broadcast...)

# # principle 8: use Kryo to optimize serialization performance

In Spark, there are three main areas that involve serialization:

When an external variable is used in an operator function, the variable is serialized and transmitted over the network (see principle 7: broadcast large variables).

When you use a custom type as a generic type of RDD (for example, JavaRDD,Student is a custom type), all custom type objects are serialized. Therefore, in this case, the custom class must also implement the Serializable interface

When using a serializable persistence strategy (such as MEMORY_ONLY_SER), Spark serializes each partition in RDD into a large array of bytes.

For all three places where serialization occurs, we can optimize the performance of serialization and deserialization by using the Kryo serialization class library. By default, Spark uses Java's serialization mechanism, ObjectOutputStream/ObjectInputStream API, for serialization and deserialization. But Spark also supports the use of Kryo serialization libraries, and the performance of Kryo serialization libraries is much higher than that of Java serialization libraries. Officially, the Kryo serialization mechanism is about 10 times better than the Java serialization mechanism. The reason why Spark does not use Kryo as a serialization class library by default is that Kryo requires that it is best to register all custom types that need to be serialized, so this approach is troublesome for developers.

The following is a code example that uses Kryo. We just need to set up the serialization class and register the custom type to serialize (for example, the external variable type used in the operator function, the custom type as the RDD generic type, and so on):

/ / create a SparkConf object. Val conf = new SparkConf () .setMaster (...) .setAppName (...) / / sets the serializer to KryoSerializer. Conf.set ("spark.serializer", "org.apache.spark.serializer.KryoSerializer") / / registers the custom type to serialize. Conf.registerKryoClasses (Array (classOf [MyClass1], classof [MyClass2]))

# # principle 9: optimize data structure

In Java, there are three types that consume more memory:

Object, each Java object has additional information such as object headers, references, etc., so it takes up memory space.

String, each string contains an array of characters, length and other additional information

Collection types, such as HashMap, LinkedList, etc., because some inner classes are usually used inside collection types to encapsulate collection elements, such as Map.Entry

Therefore, Spark officially recommends that in the implementation of Spark coding, especially for the code in operator functions, try not to use the above three data structures, try to use strings instead of objects, use primitive types (such as Int, Long) instead of strings, and use arrays instead of collection types, so as to reduce memory footprint as much as possible, thus reducing GC frequency and improving performance.

However, in the author's coding practice, it is not easy to achieve this principle. Because we have to consider the maintainability of the code at the same time, if there is no object abstraction in a code, it is all the way of string concatenation, then it is undoubtedly a great disaster for subsequent code maintenance and modification. Similarly, if all operations are based on arrays instead of using collection types such as HashMap and LinkedList, it is also a great challenge to our coding difficulty and code maintainability. Therefore, the author suggests that, where possible and appropriate, use less memory-intensive data structures, but the premise is to ensure the maintainability of the code. # # Resource tuning

After you have developed the Spark job, it is time to configure the appropriate resources for the job. The resource parameters of Spark can basically be set as parameters in the spark-submit command. Many Spark beginners usually don't know which necessary parameters to set and how to set them, so they can only be set arbitrarily or not at all. Unreasonable setting of resource parameters may cause jobs to run very slowly without making full use of cluster resources, or the set resources are too large and the queue does not have enough resources to provide, resulting in a variety of exceptions. In short, in either case, the Spark job will be inefficient or even impossible to run at all. Therefore, we must have a clear understanding of the principle of resource use of Spark jobs, and know which resource parameters can be set during the running of Spark jobs, and how to set appropriate parameter values.

* * basic operation principle of Spark job * *

The detailed principle is shown in the picture above. After we submit a Spark job using spark-submit, the job starts a corresponding Driver process. Depending on the deployment model (deploy-mode) you use, the Driver process may be started locally or on a worker node in the cluster. The Driver process itself occupies a certain amount of memory and CPU core according to the parameters we set. The first thing the Driver process needs to do is to apply to the cluster manager (which can be a Spark Standalone cluster or other resource management cluster. Meituan and Dianping uses YARN as a resource management cluster) to apply for the resources needed to run Spark jobs. The resources here refer to the Executor process. The YARN cluster manager starts a certain number of Executor processes on each worker node according to the resource parameters we set for the Spark job, and each Executor process occupies a certain amount of memory and CPU core.

After applying for the resources needed to execute the job, the Driver process will begin to schedule and execute the job code we wrote. The Driver process splits the Spark job code we wrote into multiple stage, each stage executes a portion of the code snippet, creates a batch of task for each stage, and then assigns these task to each Executor process for execution. Task is the smallest computing unit that performs exactly the same computing logic (that is, a piece of code we write ourselves), except that each task processes different data. After all the task of a stage is executed, the intermediate result of the calculation is written to the local disk file of each node, and then the Driver will schedule the next stage to run. The input data of the next stage's task is the intermediate result of the previous stage output. This cycle goes on and on until we execute all the logic of the code we have written, calculate all the data, and get the results we want.

Spark divides the stage according to the shuffle class operator. If some shuffle class operator (such as reduceByKey, join, etc.) is executed in our code, a stage boundary is drawn at that operator. It can be roughly understood that the code before the shuffle operator execution will be divided into a stage,shuffle operator execution and the subsequent code will be divided into the next stage. So when a stage starts to execute, each task of it may pull all the key that need to be processed by itself from the node where the task of the previous stage is located, and then perform aggregation operations (such as functions received by the reduceByKey () operator) on all the same key pulled using the operator functions written by ourselves. This process is called shuffle.

When we perform persistence operations such as cache/persist in the code, the data calculated by each task will also be saved to the memory of the Executor process or to the disk file of the node, depending on the persistence level we choose.

Therefore, the memory of Executor is mainly divided into three blocks: the first block is to let task use when executing the code we wrote, which accounts for 20% of the total Executor memory by default; the second block is to let task pull the output of the task of the previous stage through the shuffle process, and then use it for aggregation and other operations, which is also 20% of the total Executor memory by default; and the third block is to make the RDD persistent, which accounts for 60% of the total Executor memory by default.

The execution speed of task is directly related to the number of CPU core per Executor process. A CPU core can only execute one thread at a time. On the other hand, multiple task allocated to each Executor process run concurrently in the way of one thread per task. If the number of CPU core is sufficient and the number of task allocated is reasonable, then generally speaking, you can execute these task threads quickly and efficiently.

The above is the explanation of the basic operation principle of the Spark job, which can be understood in combination with the figure above. Understanding the basic principles of the operation is the basic premise for us to optimize the resource parameters.

# # principle: tuning resource parameters

After understanding the fundamentals of Spark job operation, it is easy to understand the parameters related to resources. In fact, the so-called Spark resource parameter tuning is mainly to optimize the efficiency of resource use by adjusting various parameters in the process of Spark operation, so as to improve the execution performance of Spark jobs. The following parameters are the main resource parameters in Spark. Each parameter corresponds to some part of the operation principle of the job. We also give a reference value for tuning.

Num-executors

Parameter description: this parameter is used to set the total number of Executor processes to execute the Spark job. When Driver applies for resources from the YARN cluster manager, the YARN cluster manager will start the appropriate number of Executor processes on each worker node of the cluster as much as possible according to your settings. This parameter is very important, if not set, the default will only give you to start a small number of Executor processes, when your Spark job is very slow.

Parameter tuning suggestion: it is appropriate for each Spark job to run with about 50 Executor processes, but it is not good for Executor processes with too few or too many settings. Set too little to make full use of cluster resources; if you set too much, most queues may not be able to give sufficient resources.

Executor-memory

Parameter description: this parameter is used to set the memory of each Executor process. The size of Executor memory often directly determines the performance of Spark jobs, and it is also directly related to common JVM OOM exceptions.

Parameter tuning recommendation: the memory setting of 4G~8G for each Executor process is more appropriate. But this is only a reference value, and the specific setting still depends on the resource queue of different departments. You can take a look at the maximum memory limit of your team's resource queue. Num-executors times executor-memory represents the total amount of memory applied for by your Spark job (that is, the total memory of all Executor processes), which cannot exceed the maximum memory of the queue. In addition, if you are sharing the resource queue with the rest of the team, then the total amount of memory requested had better not exceed the maximum total memory of the resource queue, so as to prevent your own Spark jobs from taking up all the resources of the queue and preventing other students' assignments from running.

Executor-cores

Parameter description: this parameter is used to set the number of CPU core for each Executor process. This parameter determines the ability of each Executor process to execute task threads in parallel. Because each CPU core can only execute one task thread at a time, the more CPU core you have per Executor process, the faster you can execute all the task threads assigned to you.

Parameter tuning suggestion: it is appropriate to set the number of CPU core of Executor to 2-4. It is also determined according to the resource queues of different departments. You can take a look at the maximum CPU core limit of your resource queue, and then determine how many Executor can be allocated to each Executor process according to the number of CPU core you set. It is also recommended that if you are sharing this queue with others, it is appropriate that the num-executors * executor-cores should not exceed the total CPU core of the queue. It is also recommended to avoid affecting the operation of other students' homework.

Driver-memory

Parameter description: this parameter is used to set the memory of the Driver process.

Parameter tuning suggestion: generally speaking, the memory of Driver is not set, or about 1 GB should be enough. The only thing to note is that if you need to use the collect operator to pull all the data from RDD to Driver for processing, you must make sure that the memory of Driver is large enough, otherwise there will be an OOM memory overflow.

Spark.default.parallelism

Parameter description: this parameter is used to set the default number of task for each stage. This parameter is extremely important, if not set may directly affect the performance of your Spark job.

Parameter tuning recommendation: the default number of task for Spark jobs is 500 to 1000. A common mistake made by many students is not to set this parameter, which will cause Spark to set the number of task based on the number of block of the underlying HDFS. By default, one HDFS block corresponds to one task. Generally speaking, the number of Spark default settings is too small (for example, only a few dozen task), if the number of task is too small, it will cause all the previous efforts to set Executor parameters to be wasted. Just imagine, no matter how many Executor processes you have, how much memory and CPU you have, but only 1 or 10 task, then 90% of Executor processes may not have task execution at all, which is a waste of resources! Therefore, the setting principle recommended on the official website of Spark is that it is appropriate to set this parameter to 2 / 3 times of num-executors * executor-cores. For example, if the total number of CPU core of Executor is 300, then it is OK to set 1000 task. At this time, you can make full use of the resources of Spark cluster.

* * spark.storage.memoryFraction**

Parameter description: this parameter is used to set the percentage of RDD persistence data in Executor memory. The default is 0.6. In other words, 60% of the default Executor memory can be used to hold persistent RDD data. Depending on the persistence strategy you choose, if there is not enough memory, the data may not be persisted, or the data may be written to disk.

Parameter tuning suggestion: if there are more RDD persistence operations in the Spark job, the value of this parameter can be increased appropriately to ensure that the persisted data can be contained in memory. Avoid not having enough memory to cache all the data, so that the data can only be written to disk, which degrades performance. However, if there are more shuffle class operations and fewer persistence operations in the Spark job, it is more appropriate to lower the value of this parameter. In addition, if it is found that the job runs slowly due to frequent gc (the gc time of the job can be observed through spark web ui), which means that task does not have enough memory to execute user code, it is also recommended to lower the value of this parameter.

Spark.shuffle.memoryFraction

Parameter description: this parameter is used to set the proportion of Executor memory that can be used for aggregation operation after a task pulls the task output of the previous stage in the shuffle process. The default is 0.2. In other words, Executor defaults to only 20% of the memory used for this operation. If the memory used by the shuffle operation exceeds this 20% limit when aggregating, the excess data will be overwritten to the disk file, which will greatly degrade performance.

Parameter tuning suggestion: if there are fewer RDD persistence operations and more shuffle operations in Spark jobs, it is recommended to reduce the memory share of persistence operations and increase the memory share of shuffle operations, so as to avoid insufficient memory when there is too much data in the shuffle process, and must be overwritten to disk, thus reducing performance. In addition, if it is found that the job runs slowly due to frequent gc, which means that task does not have enough memory to execute user code, it is also recommended to lower the value of this parameter.

There is no fixed value for tuning resource parameters, so students are required to set the above parameters reasonably according to their actual conditions (including the number of shuffle operations in Spark assignments, the number of RDD persistence operations and the homework gc shown in spark web ui), and refer to the principles and tuning suggestions given in this article.

. / bin/spark-submit\-- master yarn-cluster\-- num-executors 100\-- executor-memory 6G\-- executor-cores 4\-- driver-memory 1G\-- conf spark.default.parallelism=1000\-- conf spark.storage.memoryFraction=0.5\-- conf spark.shuffle.memoryFraction=0.3\ so far, I believe you have a deeper understanding of "what are the basics of Spark performance optimization". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report