Significant Parameters in YARN

Qubole offers Spark-on-YARN variant and so, the YARN parameters are applicable to both Hadoop 2 and Spark.

The parameters that can be useful in Hadoop 2 (Hive) and Spark configuration are described in the following sub-topics.

All platforms:

See also:

Currently AWS-only:

See also:

Note

See Composing a Hadoop Job for information on composing a Hadoop job.

Configuring Job History Compression

mapreduce.jobhistory.completed.codec specifies the codec to use to compress the job history files while storing them in a Cloud location. The default value is com.hadoop.compression.lzo.LzopCodec.

Configuring Job Runtime

Use yarn.resourcemanager.app.timeout.minutes to configure how many minutes a YARN application can run. This parameter can prevent a runaway application from keeping the cluster alive unnecessarily.

This is a cluster-level setting; set it in the Override Hadoop Configuration Variables field under the Advanced Configuration tab of the Clusters page in the QDS UI. See Advanced configuration: Modifying Hadoop Cluster Settings for more information.

The Resource Manager kills a YARN application if it runs longer than the configured timeout.

Setting this parameter to -1 means that the application never times out.

Enabling Container Packing in Hadoop 2 and Spark

Qubole allows you to pack containers in Hadoop 2 (Hive) and Spark. You must enable this feature; it is disabled by default. When enabled, container packing causes the scheduler to pack containers on a subset of nodes instead of distributing them across all the nodes of the cluster. This increases the probability of some nodes remaining unused; these nodes become eligible for downscaling, reducing your cost.

How Container Packing Works

Packing works by separating nodes into three sets:

  • Nodes with no containers (the Low set)
  • Nodes with memory utilization greater than the threshold (the High set)
  • All other nodes (the Medium set)

When container packing is enabled, YARN schedules each container request in this order: nodes in the Medium set first, nodes in the Low set next, nodes the High set last.

Configuring Container Packing

Configure container packing as an Hadoop cluster override in the Override Hadoop Configuration Variables field on the Edit Cluster page. See Managing Clusters for more information. The configuration options are:

  • To enable container packing, set yarn.scheduler.fair.continuous-scheduling-packed=true.

  • In clusters smaller than the configured minimum size, containers are distributed across all. This minimum number of nodes is governed by the following parameter:

    yarn.scheduler.fair.continuous-scheduling-packed.min.nodes=<value>. Its default value is 5.

  • A node’s memory-utilization threshold percentage, above which Qubole schedules containers on another node, is governed by the following parameter:

    yarn.scheduler.fair.continuous-scheduling-packed.high.memory.threshold=<value>. Its default value is 60.

    This parameter also denotes the threshold above which a node moves to the High set from the Medium set.

Understanding YARN Virtual Cores

As of Hadoop 2.4, YARN introduced the concept of vcores (virtual cores). A vcore is a share of host CPU that the YARN Node Manager allocates to available resources.

yarn.scheduler.maximum-allocation-vcores is the maximum allocation for each container request at the Resource Manager, in terms of virtual CPU cores. Requests higher than this would not get effective and get capped to this value.

The default value for yarn.scheduler.maximum-allocation-vcores in Qubole is set to twice the number of CPUs. This over subscription assumes that CPUs are not always running a thread, and hence, assigning more cores enables maximum CPU utilization.

Configuring Direct File Output Committer (AWS, Azure)

In general, the final output of a MapReduce job is written to a location in Cloud storage or HDFS, but is first written into a temporary location. The output data is moved from the temporary location to the final location in the task’s commit phase.

When DirectFileOutputCommitter (DFOC) is enabled, the output data is written directly to the final location. In this case, a commit phase is not required. DFOC is a Qubole-specific parameter that is also supported by other big-data vendors. Qubole supports DFOC on Amazon S3n and S3a, and Azure Blob and Data Lake storage.

The pros and cons of setting DFOC are:

Pros:

  • Avoids AWS Eventual Consistency issues (Eventual Consistency no longer applies in the AWS us-east1 region).
  • Improves performance when data is written to a Cloud location. (DFOC does not have much impact on performance when data is written into a HDFS location, because in HDFS the movement of files from one directory to another directory is very fast.)

Cons:

  • DFOC does not perform well in case of failure: in these cases, stale data may be left in the final location and workflows are generally designed to delete the final location. Hence Qubole does not enable DFOC by default. If DFOC is disabled, the abort phase of the task deletes the data in the temporary directory and a retry takes care of data deletion; no stale data is left in the final location.

Enabling DFOC

DFOC can be set in the MapReduce APIs mapred and mapreduce as follows:

  • DFOC in Mapred API:

    mapred.output.committer.class=org.apache.hadoop.mapred.DirectFileOutputCommitter

  • DFOC in Mapreduce API: mapreduce.use.directfileoutputcommitter=true

To set these parameters for a cluster, navigate to the Clusters section of the QDS UI, choose the cluster, and enter both strings in the Override Hadoop Configuration Variables field under the Advanced Configuration tab. You can also set them at the job level.

Improving Performance of Data Writes

To improve the speed of data writes, set the following Qubole options to true:

  • mapreduce.use.parallelmergepaths for Hadoop 2 jobs
  • spark.hadoop.mapreduce.use.parallelmergepaths for Spark jobs with Parquet data.

Handling Eventual Consistency Failures in Hadoop FileOutputCommitter Jobs (AWS)

YARN does not honor DFOC when appending Parquet files and thus it is forced to use FileOutputCommitter. To handle Eventual Consistency errors during FileOutputCommiter data writes to S3, and increase the speed of data writes, Qubole provides the configurable option mapreduce.use.parallelmergepaths for Hadoop 2 jobs. Set the option to true and FileOutputCommitter version to 1.

The equivalent parameter to set in Spark jobs with Parquet data is spark.hadoop.mapreduce.use.parallelmergepaths. For more information, see Handling Eventual Consistency Failures in Spark FileOutputCommitter Jobs (AWS).

Configuring Multipart File Output Committer

QDS now supports Multipart File Output Committer (MFOC). For more information, refer to this link.

In QDS, this configuration is applicable only to Hadoop jobs when the s3a file system is enabled. It is an open-beta feature.

Note

This MFOC configuration does not hold good for Hive jobs.

Create a ticket with Qubole Support if you want to use MFOC on Spark. Ensure that you do not set any MFOC configuration on a Spark cluster as Qubole Support configures it.

To enable this committer for Hadoop jobs, set the following properties as Hadoop overrides on the cluster configuration UI page.

mapreduce.outputcommitter.factory.class=org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
fs.s3a.committer.name=<directory | partitioned>
fs.s3a.committer.staging.conflict-mode=<fail | append | replace>
mapreduce.fileoutputcommitter.algorithm.version=1

In the above configuration, fs.s3a.committer.name accepts these two values:

  • directory - Use it when the S3 location is a directory.
  • partitioned - Use it when you have partitioned data on the S3 location.

Note

You must set fs.s3a.committer.staging.conflict-mode when the output directory or partition, already exists on the S3 location. The default value is fail. You can also choose to set it as append or replace as required.

As it is a cluster-level property, a cluster restart is required for the updated configuration to be effective.

Note

When you are using MFOC, it is recommended to configure the S3 bucket-level policy to delete pending multipart uploads which are older than 7 days. For more information, see Policy for Aborting Incomplete Multipart Upload.

Here is an S3 bucket-level policy for deleting pending multipart uploads that are older than 7 days.

LifecycleConfiguration={
   'Rules': [
       {
           'ID': 'MultipartUpload',
           'Prefix': "",
           'Status': 'Enabled',
           'AbortIncompleteMultipartUpload': {
               'DaysAfterInitiation': 7
           }
       },
   ]
}

Advantages of MFOC

MFOC provides these benefits:

  • There is an improvement of up to 13% in the overall job performance when writing to S3. This is as a result of comparing the commit time using MFOC with the commit time using File Output Committer V2.
  • The intermediate task outputs of a job are not visible on the S3 location until the job completes.
  • It works well with Speculation. Unlike DirectFileOutputCommitter, MFOC does not cause job failures when used in conjunction with Speculation.
  • It reduces eventual consistency issues because there are no renames in the job and task commit processes.

Configuring Timeout to Schedule AMs on Spot Nodes (AWS)

Qubole has introduced yarn.scheduler.qubole.am-on-stable.timeout.ms to set a timeout for scheduling Application Managers (AMs) on spot nodes. The timeout is set in milliseconds. By default, Qubole does not schedule AMs on AWS spot nodes. This is because such nodes can go away at any time and losing the AM of a YARN application can be disastrous. This default is specified by setting the above parameter to -1. The exceptions to this rule are as follows:

  • If the timeout option is overridden, then Qubole waits for that much time from the time the AM was requested to try to schedule on an On-Demand node. After the specified timeout expires, Qubole schedules the AM even on a Spot node.
  • If the cluster uses 100% Spot instances for autoscaling; and the minimum cluster size is less than 5, Qubole never waits to schedule the AM. Instead, it immediately schedules it on a Spot node if there is available capacity. Again, this can be overridden to wait for some time using the above option.

When you use this parameter to set a timeout, RM tries to schedule AMs on stable nodes first, however, once the timeout is hit and the RM has been unable to schedule the AM, it considers spot nodes as well.

When you set the parameter’s value to 0, RM immediately considers all nodes when trying to schedule the AM.

This parameter can have the values as described in the following table.

Parameter Values Description
-1 It is the default value that implies that AMs are not scheduled on volatile spot nodes.
0 RM schedules AMs on volatile spot nodes whenever possible (after waiting for 0 millisecond).
Any other value RM waits for <value> milliseconds before scheduling an AM on a volatile spot node.

Mapping an AWS S3 Bucket to an AWS Region

Qubole supports mapping an AWS S3 bucket to an AWS Region on Hadoop 2 (Hive) clusters using the S3a filesystem. This feature is useful when Hadoop has to work with AWS S3 buckets in different AWS regions. It can be set as an Hadoop override in the REST API call for a Hadoop 2 (Hive) cluster or in the cluster UI configuration. For information on adding an Hadoop override through the UI, see Advanced configuration: Modifying Hadoop Cluster Settings.

For information on adding an Hadoop override through a REST API call, see hadoop_settings.

The parameter used as an Hadoop Override for configuring S3-bucket-to-AWS-region mapping is fs.s3.awsBucketToRegionMapping.

Here is an example of the S3-bucket-to-AWS-region mapping configuration.

fs.s3.awsBucketToRegionMapping = {"acm-354-test": "s3.ap-northeast-2.amazonaws.com", "acm-920": "s3.ap-south-1.amazonaws.com"}