How to confirm Dynamic Partition Pruning works in Impala

This article explains how to confirm Impala’s new Dynamic Partition Pruning feature is effective in CDH5.7.x.

Dynamic Partition Pruning is a new feature introduced from CDH5.7.x / Impala 2.5, where information about the partition is collected during run time and impala prunes unnecessary partitions in the ways that were impractical to predict in advance. I have the following test case to show / prove that Dynamic Partition Pruning is working effectively.

1. Create tables with testing data:

CREATE TABLE yy (s string) PARTITIONED BY (year int) STORED AS PARQUET;
INSERT INTO yy PARTITION (year) VALUES ('1999', 1999), ('2000', 2000),
  ('2001', 2001), ('2010',2010);
COMPUTE STATS yy;

CREATE TABLE yy2 (s string) PARTITIONED BY (year int) STORED AS PARQUET;
INSERT INTO yy2 PARTITION (year) VALUES ('1999', 1999), ('2000', 2000),
  ('2001', 2001);
COMPUTE STATS yy2;

It is important to COMPUTE STATS on those tables to make Dynamic Partition Pruning effective

2. Run the following query:

Query: SELECT s FROM yy2 WHERE year IN (SELECT MIN(year) FROM yy LIMIT 1)
+------+
| s    |
+------+
| 1999 |
+------+
Fetched 1 row(s) in 0.55s

The result returned as expected because the minimum value for “year” column in table “yy” is 1999 and it is used in the WHERE condition to limit the result in table “yy2”

3. Check the explain query:

Query: explain SELECT s FROM yy2 WHERE year IN (SELECT MIN(year) FROM yy LIMIT 1)
+----------------------------------------------------------+
| Explain String                                           |
+----------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=16.00MB VCores=1 |
|                                                          |
| 07:EXCHANGE [UNPARTITIONED]                              |
| |                                                        |
| 03:HASH JOIN [LEFT SEMI JOIN, BROADCAST]                 |
| |  hash predicates: year = min(year)                     |
| |  runtime filters: RF000 <- min(year)                   |
| |                                                        |
| |--06:EXCHANGE &#91;BROADCAST&#93;                               |
| |  |                                                     |
| |  05:AGGREGATE &#91;FINALIZE&#93;                               |
| |  |  output: min:merge(year)                            |
| |  |  limit: 1                                           |
| |  |                                                     |
| |  04:EXCHANGE &#91;UNPARTITIONED&#93;                           |
| |  |                                                     |
| |  02:AGGREGATE                                          |
| |  |  output: min(year)                                  |
| |  |                                                     |
| |  01:SCAN HDFS &#91;default.yy&#93;                             |
| |     partitions=4/4 files=4 size=936B                   |
| |                                                        |
| 00:SCAN HDFS &#91;default.yy2&#93;                               |
|    partitions=3/3 files=3 size=702B                      |
|    runtime filters: RF000 -> year                        |
+----------------------------------------------------------+
Fetched 25 row(s) in 0.05s

We can see that table “yy2” was scheduled to be scanned through all 3 partitions 1999, 2000 and 2001, this is expected because at compile time, Impala does not know the result of “SELECT MIN(year) FROM yy LIMIT 1”, so it plans to scan through all partitions in table “yy2”.

4. Let’s check the SUMMARY of the query:

Operator          #Hosts   Avg Time   Max Time  #Rows  Est. #Rows  Peak Mem  Est. Peak Mem  Detail                    
----------------------------------------------------------------------------------------------------------------------
07:EXCHANGE            1    0.000ns    0.000ns      1           1         0        -1.00 B  UNPARTITIONED             
03:HASH JOIN           3  180.999ms  259.000ms      1           1   2.02 MB         5.00 B  LEFT SEMI JOIN, BROADCAST 
|--06:EXCHANGE         3    0.000ns    0.000ns      1           1         0              0  BROADCAST                 
|  05:AGGREGATE        1  215.000ms  215.000ms      1           1  24.00 KB        -1.00 B  FINALIZE                  
|  04:EXCHANGE         1    0.000ns    0.000ns      3           1         0        -1.00 B  UNPARTITIONED             
|  02:AGGREGATE        3  147.333ms  168.000ms      3           1  20.00 KB       10.00 MB                            
|  01:SCAN HDFS        3   28.333ms   39.000ms      4           4  36.00 KB              0  default.yy            
00:SCAN HDFS           3    2.000ms    6.000ms      1           3  46.00 KB       16.00 MB  default.yy2

Check the last row for the SCAN HDFS operator for table “yy2”, we can see that “Est. #Rows” was 3, however, the actual #Rows scanned was only 1. This confirms that the Dynamic Partition Pruning is working, as only one partition/row was scanned instead of 3 partitions/rows

5. Another way to check Dynamic Partition Pruning is effective is to check the Filter in the SCAN operator per node in the query PROFILE. In my test I have 3 hosts.

– Host one:

Filter 0
    - Files processed: 1
    - Files rejected: 0
    - Files total: 1

– Host two:

Filter 0
    - Files processed: 1
    - Files rejected: 1
    - Files total: 1

– Host three:

Filter 0
    - Files processed: 1
    - Files rejected: 1
    - Files total: 1

From above, we can see that each host processed one file each, however, two of them rejected the files due to Dynamic Partition Pruning.

How to import BLOB data into HBase directly using Sqoop

Recently I was dealing with an issue that I was not able to import BLOB data correctly into HBase from Oracle database. All other columns were imported successfully, however, the BLOB column failed to appear in HBase table.

My test table has three columns, ID:int, DATA_S:VARCHAR2 and DATA_B:BLOB. The following was the original command that failed to import BLOB column:

sqoop import -Dsqoop.hbase.add.row.key=true \
    --connect jdbc:oracle:thin:@//<oracle-host>:1521/orcl \
    --username USERNAME --password password --table TEST_TABLE \ 
    --hbase-create-table --hbase-table test_case_no_bulkload \
    --column-family cf --split-by ID \
    --hbase-row-key ID

Result as below:

ROW          COLUMN+CELL
 1           column=cf:DATA_S, timestamp=1475219854006, value=Test
 1           column=cf:ID, timestamp=1475219854006, value=1
 2           column=cf:DATA_S, timestamp=1475219894990, value=Test MOre
 2           column=cf:ID, timestamp=1475219894990, value=2
2 row(s) in 0.7070 seconds

You can see that DATA_B column was missing in the destination HBase table.

The fix here is to use the bulkload for HBase, see the command below:

sqoop import -Dsqoop.hbase.add.row.key=true \
    --connect jdbc:oracle:thin:@//<oracle-host>:1521/orcl \
    --username USERNAME --password password --table TEST_TABLE \ 
    --hbase-create-table --hbase-table test_case_bulkload \
    --column-family cf --split-by ID \
    --hbase-row-key ID \
    --hbase-bulkload

See the new result:

hbase(main):002:0> scan 'test_case_bulkload'
ROW       COLUMN+CELL
 1        column=cf:DATA_B, timestamp=1475220177891, value=2f 72 6f 6f 74 2f 31 30 39 33 33 31 2f 6c 65 69 73 61 5f 63 68 72 69 73 74 6d 61 73 5f 66 61 6c 73 65 5f 63 6f 6c 6f 72 2e 70 6e 67
 1        column=cf:DATA_S, timestamp=1475220177891, value=Test
 2        column=cf:DATA_B, timestamp=1475220177891, value=41 6e 6f 74 68 65 72 20 74 65 73 74
 2        column=cf:DATA_S, timestamp=1475220177891, value=Test MOre
2 row(s) in 0.0620 seconds

You can see that column DATA_B has been created in HBase. However, it comes to another problem. Do you notice the ID column in HBase is missing? We expect the ID column should be created as part of the column family because we specified “-Dsqoop.hbase.add.row.key=true” in the import command.

This is caused by a known issue that is reported by my colleague, see SQOOP-2952 for details.

Basically the issue is when using –hbase-bulkload, -Dsqoop.hbase.add.row.key=true will be ignored and the key will not be created as part of column family for the new HBase table.

If you do not care about the row key to be in the column family, then –hbase-bulkload is the solution for you, otherwise you will have to wait for SQOOP-2952 to be resolved.

Hope this helps.

How to Index LZO files in Hadoop

Today I was trying to index LZO file using hadoop command:

hadoop jar /opt/cloudera/parcels/GPLEXTRAS-5.7.0-1.cdh5.7.0.p0.40/lib/hadoop/lib/hadoop-lzo.jar com.hadoop.compression.lzo.DistributedLzoIndexer /tmp/lzo_test

However, it failed with the following error:

16/09/10 03:05:51 INFO mapreduce.Job: Task Id : attempt_1473404927068_0005_m_000000_0, Status : FAILED
Error: java.lang.NullPointerException
       	at com.hadoop.mapreduce.LzoSplitRecordReader.initialize(LzoSplitRecordReader.java:50)
       	at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:548)
       	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:786)
       	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
       	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
       	at java.security.AccessController.doPrivileged(Native Method)
       	at javax.security.auth.Subject.doAs(Subject.java:415)
       	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
       	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

After some research, it turned out that I did not have LZO codec enabled in my cluster. I did the following to resolve the issue:

– Add gplextras parcel to Cloudera Manager’s parcel configuration:
https://archive.cloudera.com/gplextras5/parcels/x.x.x

where x.x.x should match with your CDH version, in my case is 5.7.0

screen-shot-2016-09-10-at-8-25-24-pm

– Install GPL Extras Parcel through Cloudera Manager as normal
– Add the following codec to Compression Codec (io.compression.codecs) in Cloudera Manager > HDFS > Configurations:

com.hadoop.compression.lzo.LzopCodec
com.hadoop.compression.lzo.LzoCodec

screen-shot-2016-09-10-at-8-17-45-pm

– Restart Cluster
– Deploy Client Cluster Configuration
– Install native-lzo library on all hosts in the cluster

sudo yum install lzop

After the above changes, the index job should finish without issues:

hadoop jar /opt/cloudera/parcels/GPLEXTRAS-5.7.0-1.cdh5.7.0.p0.40/lib/hadoop/lib/hadoop-lzo.jar com.hadoop.compression.lzo.DistributedLzoIndexer /tmp/lzo_test
16/09/10 03:22:10 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
16/09/10 03:22:10 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 820f83d05b8d916e89dbb72d6ef129113b277303]
16/09/10 03:22:12 INFO lzo.DistributedLzoIndexer: Adding LZO file hdfs://host-10-17-101-195.coe.cloudera.com:8020/tmp/lzo_test/test.txt.lzo to indexing list (no index currently exists)
16/09/10 03:22:12 INFO Configuration.deprecation: mapred.map.tasks.speculative.execution is deprecated. Instead, use mapreduce.map.speculative
16/09/10 03:22:12 INFO client.RMProxy: Connecting to ResourceManager at host-10-17-101-195.coe.cloudera.com/10.17.101.195:8032
16/09/10 03:22:12 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 119 for hive on 10.17.101.195:8020
16/09/10 03:22:12 INFO security.TokenCache: Got dt for hdfs://host-10-17-101-195.coe.cloudera.com:8020; Kind: HDFS_DELEGATION_TOKEN, Service: 10.17.101.195:8020, Ident: (HDFS_DELEGATION_TOKEN token 119 for hive)
16/09/10 03:22:12 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/09/10 03:22:12 INFO input.FileInputFormat: Total input paths to process : 1
16/09/10 03:22:13 INFO mapreduce.JobSubmitter: number of splits:1
16/09/10 03:22:13 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1473502403576_0002
16/09/10 03:22:13 INFO mapreduce.JobSubmitter: Kind: HDFS_DELEGATION_TOKEN, Service: 10.17.101.195:8020, Ident: (HDFS_DELEGATION_TOKEN token 119 for hive)
16/09/10 03:22:13 INFO impl.YarnClientImpl: Submitted application application_1473502403576_0002
16/09/10 03:22:13 INFO mapreduce.Job: The url to track the job: http://host-10-17-101-195.coe.cloudera.com:8088/proxy/application_1473502403576_0002/
16/09/10 03:22:13 INFO mapreduce.Job: Running job: job_1473502403576_0002
16/09/10 03:22:22 INFO mapreduce.Job: Job job_1473502403576_0002 running in uber mode : false
16/09/10 03:22:22 INFO mapreduce.Job:  map 0% reduce 0%
16/09/10 03:22:35 INFO mapreduce.Job:  map 22% reduce 0%
16/09/10 03:22:38 INFO mapreduce.Job:  map 36% reduce 0%
16/09/10 03:22:41 INFO mapreduce.Job:  map 50% reduce 0%
16/09/10 03:22:44 INFO mapreduce.Job:  map 62% reduce 0%
16/09/10 03:22:47 INFO mapreduce.Job:  map 76% reduce 0%
16/09/10 03:22:50 INFO mapreduce.Job:  map 90% reduce 0%
16/09/10 03:22:53 INFO mapreduce.Job:  map 100% reduce 0%
16/09/10 03:22:53 INFO mapreduce.Job: Job job_1473502403576_0002 completed successfully
16/09/10 03:22:53 INFO mapreduce.Job: Counters: 30
       	File System Counters
       		FILE: Number of bytes read=0
       		FILE: Number of bytes written=118080
       		FILE: Number of read operations=0
       		FILE: Number of large read operations=0
       		FILE: Number of write operations=0
       		HDFS: Number of bytes read=85196
       		HDFS: Number of bytes written=85008
       		HDFS: Number of read operations=2
       		HDFS: Number of large read operations=0
       		HDFS: Number of write operations=4
       	Job Counters
       		Launched map tasks=1
       		Data-local map tasks=1
       		Total time spent by all maps in occupied slots (ms)=28846
       		Total time spent by all reduces in occupied slots (ms)=0
       		Total time spent by all map tasks (ms)=28846
       		Total vcore-seconds taken by all map tasks=28846
       		Total megabyte-seconds taken by all map tasks=29538304
       	Map-Reduce Framework
       		Map input records=10626
       		Map output records=10626
       		Input split bytes=138
       		Spilled Records=0
       		Failed Shuffles=0
       		Merged Map outputs=0
       		GC time elapsed (ms)=32
       		CPU time spent (ms)=3960
       		Physical memory (bytes) snapshot=233476096
       		Virtual memory (bytes) snapshot=1564155904
       		Total committed heap usage (bytes)=176160768
       	File Input Format Counters
       		Bytes Read=85058
       	File Output Format Counters
       		Bytes Written=0

How to redirect parquet’s log message into STDERR rather than STDOUT

This article explains the steps needed to redirect parquet’s log message from STDOUT to STDERR, so that the output of Hive result will not be polluted should the user wants to capture the query result on command line.

In Parquet’s code based, it writes its logging information directly into STDOUT, this will cause some applications to fail because those messages will be captured, see example below:

1. Table with TEXT file format works as below:

$ test=`hive -e "SELECT * FROM default.test"` 

$ echo $test 
2 5 4 3 2 1 5 4 3 2

2. However, if you do the same thing for Parquet table, the result is different:

$ test_parquet=`hive -e "SELECT * FROM default.test_parquet"` 

$ echo $test_parquet 
2 5 4 3 2 1 5 4 3 2 16/08/2016 5:55:32 PM WARNING: parquet.hadoop.ParquetRecordReader: 
Can not initialize counter due to context is not a instance of TaskInputOutputContext, 
but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 
16/08/2016 5:55:32 PM INFO: parquet.hadoop.InternalParquetRecordReader: 
RecordReader initialized will read a total of 10 records. 16/08/2016 5:55:32 PM 
INFO: parquet.hadoop.InternalParquetRecordReader: at row 0. reading next block 16/08/2016 5:55:32 PM 
INFO: parquet.hadoop.InternalParquetRecordReader: block read in memory in 15 ms. row count = 10

So if an application tries to use the variable $test_parquet, it will cause issues due to those WARNING messages.

This problem has been reported in upstream JIRA: HIVE-13954, however, at the time of writing (CDH5.8.1), this JIRA has not been backported into CDH yet.

To workaround the problem, follow the steps below:

  1. Save the content of the following to a file:
    #===============
    parquet.handlers= java.util.logging.ConsoleHandler
    .level=INFO
    java.util.logging.ConsoleHandler.level=INFO
    java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter
    java.util.logging.SimpleFormatter.format=[%1$tc] %4$s: %2$s - %5$s %6$s%n
    #===============
    

    and put it anywhere you like on the client machine that you will run Hive CLI, in my test I put it under /tmp/parquet/parquet-logging2.properties

  2. run the following command on shell before you run Hive CLI:
    export HADOOP_CLIENT_OPTS="-Djava.util.logging.config.file=/tmp/parquet/parquet-logging2.properties"
    

    please change the path to the properties file accordingly

  3. run your Hive CLI command:

    test_parquet=`hive -e "SELECT * FROM default.test_parquet"`
    

    the output will be saved in “$test_parquet” as expected

Note: Please be advised that Hive CLI is now deprecated, we strongly advise that you connect to HS2 through JDBC or ODBC driver to get proper results, we do not recommend to parse result from the Hive CLI output.

Note: Please also advise that this workaround will only work in CDH version of Hive, as the upstream version has different package names as the one in CDH for Parquet class.

Beeline options need to be placed before “-e” option

Recently I needed to deal with an issue that users tried to specify “–incremental=true” as beeline command line option, due to the issue that beeline failed with OutOfMemory error when fetching results from HiveServer2. This option should help with the OOM problem, however it did not in this particular case. The command was run as below:

beeline --hiveconf mapred.job.queue.name=queue_name --silent=true 
-u 'jdbc:hive2://<hs2-host>:10000/default;principal=hive/<hs2-host>@<REALM>' 
--outputformat=csv2 --silent=true -e 'select * from table_name' 
--incremental=true > output.csv

It failed with the following error:

org.apache.thrift.TException: Error in calling method FetchResults
    at org.apache.hive.jdbc.HiveConnection$SynchronizedHandler.invoke(HiveConnection.java:1271)
    at com.sun.proxy.$Proxy8.FetchResults(Unknown Source)
    at org.apache.hive.jdbc.HiveQueryResultSet.next(HiveQueryResultSet.java:363)
    at org.apache.hive.beeline.BufferedRows.<init>(BufferedRows.java:42)
    at org.apache.hive.beeline.BeeLine.print(BeeLine.java:1756)
    at org.apache.hive.beeline.Commands.execute(Commands.java:826)
    at org.apache.hive.beeline.Commands.sql(Commands.java:670)
    at org.apache.hive.beeline.BeeLine.dispatch(BeeLine.java:974)
    at org.apache.hive.beeline.BeeLine.initArgs(BeeLine.java:716)
    at org.apache.hive.beeline.BeeLine.begin(BeeLine.java:753)
    at org.apache.hive.beeline.BeeLine.mainWithInputRedirection(BeeLine.java:480)
    at org.apache.hive.beeline.BeeLine.main(BeeLine.java:463)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.lang.Double.valueOf(Double.java:521)
    at org.apache.hive.service.cli.thrift.TDoubleColumn$TDoubleColumnStandardScheme.read(TDoubleColumn.java:454)
    at org.apache.hive.service.cli.thrift.TDoubleColumn$TDoubleColumnStandardScheme.read(TDoubleColumn.java:433)
    at org.apache.hive.service.cli.thrift.TDoubleColumn.read(TDoubleColumn.java:367)
    at org.apache.hive.service.cli.thrift.TColumn.standardSchemeReadValue(TColumn.java:318)
    at org.apache.thrift.TUnion$TUnionStandardScheme.read(TUnion.java:224)
    at org.apache.thrift.TUnion$TUnionStandardScheme.read(TUnion.java:213)
    at org.apache.thrift.TUnion.read(TUnion.java:138)
    at org.apache.hive.service.cli.thrift.TRowSet$TRowSetStandardScheme.read(TRowSet.java:573)
    at org.apache.hive.service.cli.thrift.TRowSet$TRowSetStandardScheme.read(TRowSet.java:525)
    at org.apache.hive.service.cli.thrift.TRowSet.read(TRowSet.java:451)
    at org.apache.hive.service.cli.thrift.TFetchResultsResp$TFetchResultsRespStandardScheme.read(TFetchResultsResp.java:518)
    at org.apache.hive.service.cli.thrift.TFetchResultsResp$TFetchResultsRespStandardScheme.read(TFetchResultsResp.java:486)
    at org.apache.hive.service.cli.thrift.TFetchResultsResp.read(TFetchResultsResp.java:408)
    at org.apache.hive.service.cli.thrift.TCLIService$FetchResults_result$FetchResults_resultStandardScheme.read(TCLIService.java:13171)
    at org.apache.hive.service.cli.thrift.TCLIService$FetchResults_result$FetchResults_resultStandardScheme.read(TCLIService.java:13156)
    at org.apache.hive.service.cli.thrift.TCLIService$FetchResults_result.read(TCLIService.java:13103)
    at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
    at org.apache.hive.service.cli.thrift.TCLIService$Client.recv_FetchResults(TCLIService.java:501)
    at org.apache.hive.service.cli.thrift.TCLIService$Client.FetchResults(TCLIService.java:488)
    at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hive.jdbc.HiveConnection$SynchronizedHandler.invoke(HiveConnection.java:1263)
    at com.sun.proxy.$Proxy8.FetchResults(Unknown Source)
    at org.apache.hive.jdbc.HiveQueryResultSet.next(HiveQueryResultSet.java:363)
    at org.apache.hive.beeline.BufferedRows.<init>(BufferedRows.java:42)
    at org.apache.hive.beeline.BeeLine.print(BeeLine.java:1756)
    at org.apache.hive.beeline.Commands.execute(Commands.java:826)
    at org.apache.hive.beeline.Commands.sql(Commands.java:670)
    at org.apache.hive.beeline.BeeLine.dispatch(BeeLine.java:974)
    at org.apache.hive.beeline.BeeLine.initArgs(BeeLine.java:716)
Error: org.apache.thrift.TApplicationException: CloseOperation failed: out of sequence response (state=08S01,code=0)
Error: Error while cleaning up the server resources (state=,code=0)

From this stacktrace, we could see that class “BufferedRows” was used, however, if “–incremental=true” was working, it should have used class “IncrementalRows” instead. This confirmed that “–incremental=true” option was not applied.

After further experiment, I figured out that the “–incremental=true” needed to go before “-e” option for it to take effect. So run the command as below:

beeline --hiveconf mapred.job.queue.name=queue_name --silent=true 
-u 'jdbc:hive2://<hs2-host>:10000/default;principal=hive/<hs2-host>@<REALM>' 
--outputformat=csv2 --silent=true ​--incremental=true
-e 'select * from table_name' > output.csv

helped to resolve the issue. I did not look into details on why, but it should help with anyone who might have similar issues.