Friday, April 27, 2018

How to control the number of Mappers for a particular Hive query to optimize query performance

Setting both mapreduce.input.fileinputformat.split.maxsize and mapreduce.input.fileinputformat.split.minsize to the same value in most cases controls the number of Mappers used when Hive is running a particular query.
Example:
For a text file with file size of 200000 bytes
  • The following configurations triggers two Mappers for the MapReduce job:
set mapreduce.input.fileinputformat.split.maxsize=100000;
set mapreduce.input.fileinputformat.split.minsize=100000;

  • The following configurations triggers four Mappers for the MapReduce job:
set mapreduce.input.fileinputformat.split.maxsize=50000;
set mapreduce.input.fileinputformat.split.minsize=50000;
By default, Hive assigns several small files, whose file size are smaller than mapreduce.input.fileinputformat.split.minsize, to a single Mapper to limit the number of Mappers initialized. Hive also considers the data locality of each file's HDFS blocks.  If there are a lot of small files stored across different HDFS DataNodes, Hive will not combine the files into a single Mapper because they are not stored on the same machine.
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; (Default)
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; (Does not combine files)


Mappers

Setting both mapreduce.input.fileinputformat.split.maxsize and mapreduce.input.fileinputformat.split.minsize to the same value in most cases controls the number of Mappers used when Hive is running a particular query.
Example:
For a text file with file size of 200000 bytes
  • The following configurations triggers two Mappers for the MapReduce job:
set mapreduce.input.fileinputformat.split.maxsize=100000;
set mapreduce.input.fileinputformat.split.minsize=100000;

  • The following configurations triggers four Mappers for the MapReduce job:
set mapreduce.input.fileinputformat.split.maxsize=50000;
set mapreduce.input.fileinputformat.split.minsize=50000;
 
By default, Hive assigns several small files, whose file size are smaller than mapreduce.input.fileinputformat.split.minsize, to a single Mapper to limit the number of Mappers initialized. Hive also considers the data locality of each file's HDFS blocks.  If there are a lot of small files stored across different HDFS DataNodes, Hive will not combine the files into a single Mapper because they are not stored on the same machine.
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; (Default)
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; (Does not combine files)

Reducers

Determining the number of Reducers often requires more input from the developer than does determining the number of Mappers.  Efficient allocation of Reducers will greatly depend on the Hive query and the input data set.  The size of the result set of a query against a large input data set will depend greatly on the predicates in the query.  If a query heavily filters the input data set with predicates, few Reducers may be required for processing.  Alternatively, if a different query against the same large input data set has fewer filtering predicates, many Reducers may be required for processing.

Hive employs a simple algorithm for determining the number of Reducers to use based on the total size of the input data set.  The number of Reducers can be also be set explicitly by the developer.
 
-- In order to change the average load for a reducer (in bytes)
-- CDH uses a default of 67108864 (64MB)
-- That is, if the total size of the data set is 1 GB then 16 reducers will be used
set hive.exec.reducers.bytes.per.reducer=;

-- In order to set a constant number of reducers
-- Typically set to a prime close to the number of available hosts
-- Default value is -1
-- By setting this property to -1, Hive will automatically figure out what should be the number of reducers
set mapreduce.job.reduces=;

-- Hive will use this as the maximum number of reducers when automatically determining the number of reducers
-- Default value is 1099
set hive.exec.reducers.max=;


Special Cases

It is sometimes the case that certain queries do not use Reducers.  INSERT statements, for example, may generate Mapper-only MapReduce jobs.  If it is required to use Reducers for performance reasons or the developer would like to control the number of output files generated, there are workarounds available.
 
-- Add an artificial LIMIT clause to force a Reducer phase
INSERT INTO  SELECT * FROM  LIMIT 99999999;

-- Use very large split sizes to force a single Mapper (entire data set is < 2GB)
set mapreduce.input.fileinputformat.split.maxsize=2147483648;
set mapreduce.input.fileinputformat.split.minsize=2147483648;
INSERT INTO  SELECT * FROM ;

-- When enabled, dynamic partitioning column will be globally sorted
-- This way we can keep only one record writer open for each partition value in the Reducer thereby reducing the memory pressure on Reducers
-- Introduces Reducer phase into an otherwise Mapper-only MapReduce job
set hive.optimize.sort.dynamic.partition=true;
INSERT INTO  ... SELECT * FROM ;




No comments: