Understanding Considerations to Move Existing MapReduce Jobs in Hive to Tez
You can move existing MapReduce jobs in Hive to Tez.
Tez can be enabled for the entire cluster at the bootstrap level or for individual queries at runtime by setting
hive.execution.engine = tez
. If administrators configure Tez for the entire cluster then individual queries can be
reverted to MapReduce by setting hive.execution.engine = mr
at the start of the job.
For more information on queries, see Running Hive Queries on Tez.
There are certain considerations that you need to understand before moving MapReduce jobs in Hive to Tez, which are described in these sub-topics:
Understanding Log Pane
The number of Tasks for each of the Mapper or Reducer Vertices is displayed in the Logs pane. The information is displayed as A (+B, -C) ÷ D where:
A implies the number of completed tasks
B implies the number of running tasks
C implies the number of failed tasks
D implies the total number of tasks
Understanding Application Memory
Out-of-memory errors may occur when there are an exceptionally large number of tasks being executed in parallel or there
are too many files involved in the split computation. Managing the Application Master configuration can ensure that these
types of issues do not occur. This memory is controlled with tez.am.resource.memory.mb
and a good starting point for
this value may be yarn.app.mapreduce.am.resource.mb
. The memory available for the Containers (JVMs) is controlled with
tez.am.launch.cmd-opts
that is typically set to 80% of tez.resource.memory.mb
.
Understanding Container Memory
Container Limitation issues may occur if the amount of memory required is more than what is available per the allocation
policy. If this occurs, Tez throws an error indicating that it is killing the container in response to the demands
of the container. The container size is set with hive.tez.container.size
and must be set as a multiple of
yarn.scheduler.minimum-allocation-mb
. The child java operations are controlled through hive.tez.java.opts
and
must be set to approximately 80% of hive.tez.container.size
.
Understanding Split Sizing
Split computation occur in the Application Master and by default the Maximum Split Size is 1 GB and the
Minimum Split Size is 50 MB. You can modify the Split Sizing policy by modifying tez.grouping.max-size
and tez.grouping.min-size
. Tez uses the HiveInputFormat in conjunction with the grouping settings to ensures that the
numbers of Mappers does not become a bottleneck. This is different than MapReduce which uses the CombinedHiveInputFormat by default which can result
in less Mapper Tasks. As a result it can be misleading to compare the number of Mapper Tasks between MapReduce and Tez
to gauge performance improvements.
Enabling Split Pruning
To enable Split Pruning during the split computation, configure the following:
set hive.optimize.index.filter = true;
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
Understanding Task Parallelism
The parallelism across the reducers is set by affecting the average reducer size in bytes.
hive.exec.reducers.bytes.per.reducer
is the configuration option and as this value decreases more reducers are
introduced for load distribution across tasks. The parallelism across the mappers is set by affecting tez.am.grouping.split-waves
,
which indicates the ratio between the number of tasks per vertex compared to the number of available containers in the queue.
As this value decreases, more parallelism is introduced but there are less resources allocated to a single job.
Understanding Garbage Collection
While often inconsequential the garbage collection process can lead to increased run time if there are increasingly
complex data types and queries used. The amount of time taken for garbage collection can be identified through the Tez
Application UI or by enabling hive.tez.exec.print.summary
. If garbage collection times are higher than acceptable or
expected, consider the components of the Hive functionality, which may be increasing runtime.
Understanding Map Join
When taking advantage of Map Joins in Hive, keep in mind that the larger and more complex the Hash Table used for the Map
Join, the greater the burden on the Garbage Collection process. If the Map Join is necessary to avoid a Shuffle Join or
due to performance considerations, then it may be necessary to increase the container size so that additional resources
are available for Garbage Collection. If the Map Join is not needed, then consider disabling or decreasing the value of
hive.auto.convert.join.noconditionaltask.size
to force the query to use a Shuffle Join
.
Understanding ORC Insert
When inserting into a table, which writes to an ORC file, if there are a large number columns present consider reducing
hive.exec.orc.default.bagger.size
or increasing the container size.
Understanding Partition Insert
During partitioned inserts, the performance may be impacted if there are a large number of tasks inserting into multiple
partitions at the same time. If this is observed consider enabling hive.optimize.sort.dynamic.partition
. Only do this,
if inserting into more than 10 partitions because this can have a negative impact on performance with a very small number
of partitions.
Understanding Hive Statistics
You can run the following command to trigger accurate size accounting by the compiler:
ANALYZE TABLE [table_name] COMPUTE STATISTICS for COLUMNS
After executing the above statement, enable hive.stats.fetch.column.stats
, which triggers the Hive physical optimizer
to use more accurate per-column statistics instead of the uncompressed file size represented by HDFS. After
collecting and calculating statistics, consider enabling the cost-based optimizer (CBO) with hive.cbo.enable
.