Significant Parameters in YARN
Qubole offers Spark-on-YARN variant and so, the YARN parameters are applicable to both Hadoop 2 and Spark.
The parameters that can be useful in Hadoop 2 (Hive) and Spark configuration are described in the following sub-topics.
All platforms:
- Configuring Job History Compression describes compressing job history to store it in a Cloud location. 
- Configuring Job Runtime describes how to configure how long a YARN application can run. 
- Enabling Container Packing in Hadoop 2 and Spark describes how to more effectively downscale in Hadoop 2 (Hive) and Spark clusters. 
- Understanding YARN Virtual Cores describes the YARN virtual cores. 
- Configuring Direct File Output Committer describes configuring a DirectFileOutputCommitter (DFOC) for a MapReduce task in an Hadoop 2 cluster. For DFOC on a Spark cluster, see DFOC in Spark. 
- Improving Performance of Data Writes shows how to improve the performance of data writes in Hadoop 2 and Spark. 
See also:
- An Overview of Heterogeneous Nodes in Clusters (explains how to configure heterogeneous nodes in Hadoop 2 (Hive) and Spark clusters) 
Currently AWS-only:
- Configuring Timeout to Schedule AMs on Spot Nodes (AWS) describes configuring a timeout for a Resource Manager (RM) to schedule Application Masters (AMs) on AWS Spot nodes. 
- Mapping an AWS S3 Bucket to an AWS Region describes mapping AWS S3 buckets to AWS regions on the S3a filesystem. 
- Handling Eventual Consistency Failures in Hadoop FileOutputCommitter Jobs (AWS) describes how to handle eventual consistency and increase the speed of data writes in FileOutput Committer jobs. 
- Configuring Multipart File Output Committer describes the multipart File Output Committer configuration supported only for Hadoop jobs. 
See also:
- Enabling SSE-KMS while using Hadoop DistCp for information on the parameters. 
- Configuring EBS Upscaling in AWS Hadoop and Spark Clusters describes how you can use EBS upscaling and save cluster running costs in Hadoop 2 (Hive) and Spark clusters. node_configuration and ebs_upscaling_config provides API parameters related to dynamically increasing the EBS volume storage capacity in Hadoop 2 (Hive) and Spark clusters. 
Note
See Composing a Hadoop Job for information on composing a Hadoop job.
Configuring Job History Compression
mapreduce.jobhistory.completed.codec specifies the codec to use to compress the job history files while storing them
in a Cloud location. The default value is com.hadoop.compression.lzo.LzopCodec.
Configuring Job Runtime
Use yarn.resourcemanager.app.timeout.minutes to configure how many minutes a YARN application can run.
This parameter can prevent a runaway application from keeping the cluster alive unnecessarily.
This is a cluster-level setting; set it in the Override Hadoop Configuration Variables field under the Advanced Configuration tab of the Clusters page in the QDS UI. See Advanced Configuration: Modifying Hadoop Cluster Settings for more information.
The Resource Manager kills a YARN application if it runs longer than the configured timeout.
Setting this parameter to -1 means that the application never times out.
Enabling Container Packing in Hadoop 2 and Spark
Qubole allows you to pack containers in Hadoop 2 (Hive) and Spark. You must enable this feature; it is disabled by default. When enabled, container packing causes the scheduler to pack containers on a subset of nodes instead of distributing them across all the nodes of the cluster. This increases the probability of some nodes remaining unused; these nodes become eligible for downscaling, reducing your cost.
How Container Packing Works
Packing works by separating nodes into three sets:
- Nodes with no containers (the Low set) 
- Nodes with memory utilization greater than the threshold (the High set) 
- All other nodes (the Medium set) 
When container packing is enabled, YARN schedules each container request in this order: nodes in the Medium set first, nodes in the Low set next, nodes the High set last.
Configuring Container Packing
Configure container packing as an Hadoop cluster override in the Override Hadoop Configuration Variables field on the Edit Cluster page. See Managing Clusters for more information. The configuration options are:
- To enable container packing, set - yarn.scheduler.fair.continuous-scheduling-packed=true.
- In clusters smaller than the configured minimum size, containers are distributed across all. This minimum number of nodes is governed by the following parameter: - yarn.scheduler.fair.continuous-scheduling-packed.min.nodes=<value>. Its default value is 5.
- A node’s memory-utilization threshold percentage, above which Qubole schedules containers on another node, is governed by the following parameter: - yarn.scheduler.fair.continuous-scheduling-packed.high.memory.threshold=<value>. Its default value is 60.- This parameter also denotes the threshold above which a node moves to the High set from the Medium set. 
Understanding YARN Virtual Cores
As of Hadoop 2.4, YARN introduced the concept of vcores (virtual cores). A vcore is a share of host CPU that the YARN Node Manager allocates to available resources.
yarn.scheduler.maximum-allocation-vcores is the maximum allocation for each container request at the Resource Manager,
in terms of virtual CPU cores. Requests higher than this would not get effective and get capped to this value.
The default value for yarn.scheduler.maximum-allocation-vcores in Qubole is set to twice the number of CPUs. This
over subscription assumes that CPUs are not always running a thread, and hence, assigning more cores enables maximum CPU
utilization.
Configuring Direct File Output Committer
In general, the final output of a MapReduce job is written to a location in Cloud storage or HDFS, but is first written into a temporary location. The output data is moved from the temporary location to the final location in the task’s commit phase.
When DirectFileOutputCommitter (DFOC) is enabled, the output data is written directly to the final location. In this case, a commit phase is not required. DFOC is a Qubole-specific parameter that is also supported by other big-data vendors. Qubole supports DFOC on Amazon S3n and S3a, and Azure Blob and Data Lake storage.
Note
For DFOC on a Spark cluster, see DFOC in Spark.
The pros and cons of setting DFOC are:
Pros:
- Avoids AWS Eventual Consistency issues (Eventual Consistency no longer applies in the AWS us-east1 region). 
- Improves performance when data is written to a Cloud location. (DFOC does not have much impact on performance when data is written into a HDFS location, because in HDFS the movement of files from one directory to another directory is very fast.) 
Cons:
- DFOC does not perform well in case of failure: in these cases, stale data may be left in the final location and workflows are generally designed to delete the final location. Hence Qubole does not enable DFOC by default. If DFOC is disabled, the abort phase of the task deletes the data in the temporary directory and a retry takes care of data deletion; no stale data is left in the final location. 
Enabling DFOC
DFOC can be set in the MapReduce APIs mapred and mapreduce as follows:
- DFOC in Mapred API: - mapred.output.committer.class=org.apache.hadoop.mapred.DirectFileOutputCommitter
- DFOC in Mapreduce API: - mapreduce.use.directfileoutputcommitter=true
To set these parameters for a cluster, navigate to the Clusters section of the QDS UI, choose the cluster, and enter both strings in the Override Hadoop Configuration Variables field under the Advanced Configuration tab. You can also set them at the job level.
Improving Performance of Data Writes
To improve the speed of data writes, set the following Qubole options to true:
- mapreduce.use.parallelmergepathsfor Hadoop 2 jobs
- spark.hadoop.mapreduce.use.parallelmergepathsfor Spark jobs with Parquet data.
Handling Eventual Consistency Failures in Hadoop FileOutputCommitter Jobs (AWS)
YARN does not honor DFOC when appending Parquet files and thus it is forced to use FileOutputCommitter. To handle
Eventual Consistency errors during FileOutputCommiter data writes to S3, and increase the speed of data writes,
Qubole provides the configurable option mapreduce.use.parallelmergepaths for Hadoop 2 jobs. Set the option
to true and FileOutputCommitter version to 1.
The equivalent parameter to set in Spark jobs with Parquet data is spark.hadoop.mapreduce.use.parallelmergepaths. For
more information, see Handling Eventual Consistency Failures in Spark FileOutputCommitter Jobs (AWS).
Configuring Multipart File Output Committer
QDS now supports Multipart File Output Committer (MFOC). For more information, refer to this link.
In QDS, this configuration is applicable only to Hadoop jobs when the s3a file system is enabled. It is an open-beta feature.
Note
This MFOC configuration does not hold good for Hive jobs. For MFOC on Spark, see Multipart Upload Based File Output Committer in Spark on Qubole (AWS).
To enable this committer for Hadoop jobs, set the following properties as Hadoop overrides on the cluster configuration UI page.
mapreduce.outputcommitter.factory.class=org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
fs.s3a.committer.name=<directory | partitioned>
fs.s3a.committer.staging.conflict-mode=<fail | append | replace>
mapreduce.fileoutputcommitter.algorithm.version=1
In the above configuration, fs.s3a.committer.name accepts these two values:
directory- Use it when the S3 location is a directory.
partitioned- Use it when you have partitioned data on the S3 location.
Note
You must set fs.s3a.committer.staging.conflict-mode when the output directory or partition, already exists
on the S3 location. The default value is fail. You can also choose to set it as append or replace
as required.
As it is a cluster-level property, a cluster restart is required for the updated configuration to be effective.
Note
When you are using MFOC, it is recommended to configure the S3 bucket-level policy to delete pending multipart uploads which are older than 7 days. For more information, see Policy for Aborting Incomplete Multipart Upload.
Here is an S3 bucket-level policy for deleting pending multipart uploads that are older than 7 days.
LifecycleConfiguration={
   'Rules': [
       {
           'ID': 'MultipartUpload',
           'Prefix': "",
           'Status': 'Enabled',
           'AbortIncompleteMultipartUpload': {
               'DaysAfterInitiation': 7
           }
       },
   ]
}
Advantages of MFOC
MFOC provides these benefits:
- There is an improvement of up to 13% in the overall job performance when writing to S3. This is as a result of comparing the commit time using MFOC with the commit time using File Output Committer V2. 
- The intermediate task outputs of a job are not visible on the S3 location until the job completes. 
- It works well with Speculation. Unlike DirectFileOutputCommitter, MFOC does not cause job failures when used in conjunction with Speculation. 
- It reduces eventual consistency issues because there are no renames in the job and task commit processes. 
Mapping an AWS S3 Bucket to an AWS Region
Qubole supports mapping an AWS S3 bucket to an AWS Region on Hadoop 2 (Hive) clusters using the S3a filesystem. This feature is useful when Hadoop has to work with AWS S3 buckets in different AWS regions. It can be set as an Hadoop override in the REST API call for a Hadoop 2 (Hive) cluster or in the cluster UI configuration. For information on adding an Hadoop override through the UI, see Advanced Configuration: Modifying Hadoop Cluster Settings.
For information on adding an Hadoop override through a REST API call, see hadoop_settings.
The parameter used as an Hadoop Override for configuring S3-bucket-to-AWS-region mapping is fs.s3.awsBucketToRegionMapping.
Here is an example of the S3-bucket-to-AWS-region mapping configuration.
fs.s3.awsBucketToRegionMapping = {"acm-354-test": "s3.ap-northeast-2.amazonaws.com", "acm-920": "s3.ap-south-1.amazonaws.com"}