Impala Failed to Read Parquet Decimal Data

If you are using Hive JDBC/ODBC driver to create Parquet data via Hive, and you face below error when reading from Impala:

File 'hdfs://namespace1/path/to/parquet/file/15110390563421' column 'column1' has a scale that does not match the table metadata scale. 
File metadata scale: 0 Table metadata scale: 2 when we try to execute in Impala.

It is most likely that you hit a known issue HIVE-15519.

Due to this bug, the DECIMAL scale value returned from the driver was incorrect (returns null), hence caused the data being written to parquet file with wrong scale value for those columns. And Impala will complain that the column’s definition at metadata side is not matching with the column type stored in Parquet file, due to different scale values.

Currently there is no workarounds available for this issue, and a patch is required. Or you will have to wait it to be fixed. At the time of writing, the latest Cloudera CDH is at 5.13.1.

Impala query failed with error: “Incompatible Parquet Schema”

Yesterday, I was dealing with an issue that when running a very simple Impala SELECT query, it failed with “Incompatible Parquet schema” error. I have confirmed the following workflow that triggered the error:

  1. Parquet file is created from external library
  2. Load the parquet file into Hive/Impala table
  3. Query the table through Impala will fail with below error message

    incompatible Parquet schema for column 'db_name.tbl_name.col_name'. 
    Column type: DECIMAL(19, 0), Parquet schema:\noptional byte_array col_name [i:2 d:1 r:0]
    
  4. The same query works well in Hive

This is due to impala currently does not support all decimal specs that are supported by Parquet. Currently Parquet supports the following specs:

  • int32: for 1 <= precision <= 9
  • int64: for 1 <= precision <= 18; precision < 10 will produce a warning
  • fixed_len_byte_array: precision is limited by the array size. Length n can store <= floor(log_10(2^(8*n - 1) - 1)) base-10 digits
  • binaryprecision is not limited, but is required. The minimum number of bytes to store the unscaled value should be used.

Please refer to Parquet Logical Type Definitions page for details.

However, Impala only supports fixed_len_byte_array, but no others. This has been reported in the upstream JIRA: IMPALA-2494

The only workaround for now is to create a parquet file that will use supported specs for Decimal column, or simply create parquet file through either Hive or Impala.

Impala Reported Corrupt Parquet File After Failed With OutOfMemory Error

Recently I was dealing with an issue that impala reported Corrupt Parquet File after it failed with OutOfMemory error, however, if it does not fail, no corruption will be reported.

See below error message reportd in Impala Daemon logs:

Memory limit exceeded
HdfsParquetScanner::ReadDataPage() failed to allocate 65535 bytes for decompressed data.
Corrupt Parquet file 'hdfs://nameservice1/path/to/file/914164e7120e6076-cdae1be60000001f_169433548_data.0.parq': column 'client_ord_id' had 1024 remaining values but expected 0 _
[Executed: 4/29/2017 5:28:58 AM] [Execution: 588ms]
When an impala query failed with OOM error, it also reported corrupted parquet file:

HdfsParquetScanner::ReadDataPage() failed to allocate 65535 bytes for decompressed data.
Corrupt Parquet file 'hdfs://nameservice1/path/to/file/914164e7120e6076-cdae1be60000001f_169433548_data.0.parq': column 'client_ord_id' had 1024 remaining values but expected 0 _
[Executed: 4/29/2017 5:28:58 AM] [Execution: 588ms]

This is reported in the upstream JIRA: IMPALA-5197, this can happen in the following scenarios:

  • Query failed with OOM error
  • There is a LIMIT clause in the query
  • Query is manually cancelled by the user

Those corrupt messages do not mean the file is really corrupted, it is caused by an Impala bug that mentioned earlier IMPALA-5197.

If it is caused by OutOfMemory error, simply increase the memory limit for the query and try again:

SET MEM_LIMIT=10g;

For the other two causes, we will need to wait for IMPALA-5197 to be fixed

Update:IMPALA-5197 has been fixed since CDH5.12.0 as well as CDH5.10.2, CDH5.9.3 and CDH5.11.2.

How to redirect parquet’s log message into STDERR rather than STDOUT

This article explains the steps needed to redirect parquet’s log message from STDOUT to STDERR, so that the output of Hive result will not be polluted should the user wants to capture the query result on command line.

In Parquet’s code based, it writes its logging information directly into STDOUT, this will cause some applications to fail because those messages will be captured, see example below:

1. Table with TEXT file format works as below:

$ test=`hive -e "SELECT * FROM default.test"` 

$ echo $test 
2 5 4 3 2 1 5 4 3 2

2. However, if you do the same thing for Parquet table, the result is different:

$ test_parquet=`hive -e "SELECT * FROM default.test_parquet"` 

$ echo $test_parquet 
2 5 4 3 2 1 5 4 3 2 16/08/2016 5:55:32 PM WARNING: parquet.hadoop.ParquetRecordReader: 
Can not initialize counter due to context is not a instance of TaskInputOutputContext, 
but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 
16/08/2016 5:55:32 PM INFO: parquet.hadoop.InternalParquetRecordReader: 
RecordReader initialized will read a total of 10 records. 16/08/2016 5:55:32 PM 
INFO: parquet.hadoop.InternalParquetRecordReader: at row 0. reading next block 16/08/2016 5:55:32 PM 
INFO: parquet.hadoop.InternalParquetRecordReader: block read in memory in 15 ms. row count = 10

So if an application tries to use the variable $test_parquet, it will cause issues due to those WARNING messages.

This problem has been reported in upstream JIRA: HIVE-13954, however, at the time of writing (CDH5.8.1), this JIRA has not been backported into CDH yet.

To workaround the problem, follow the steps below:

  1. Save the content of the following to a file:

    #===============
    parquet.handlers= java.util.logging.ConsoleHandler
    .level=INFO
    java.util.logging.ConsoleHandler.level=INFO
    java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter
    java.util.logging.SimpleFormatter.format=[%1$tc] %4$s: %2$s - %5$s %6$s%n
    #===============
    

    and put it anywhere you like on the client machine that you will run Hive CLI, in my test I put it under /tmp/parquet/parquet-logging2.properties

  2. run the following command on shell before you run Hive CLI:

    export HADOOP_CLIENT_OPTS="-Djava.util.logging.config.file=/tmp/parquet/parquet-logging2.properties"
    

    please change the path to the properties file accordingly

  3. run your Hive CLI command:

    test_parquet=`hive -e "SELECT * FROM default.test_parquet"`
    

    the output will be saved in “$test_parquet” as expected

Note: Please be advised that Hive CLI is now deprecated, we strongly advise that you connect to HS2 through JDBC or ODBC driver to get proper results, we do not recommend to parse result from the Hive CLI output.

Note: Please also advise that this workaround will only work in CDH version of Hive, as the upstream version has different package names as the one in CDH for Parquet class.

Unable to query Hive parquet table after altering column type

Currently Hive does not support changing column types for parquet tables, due to performance issues. I have developed the following test case to prove the bug:

DROP TABLE IF EXISTS test;
CREATE TABLE test (a INT, b DOUBLE) STORED AS PARQUET;
INSERT OVERWRITE TABLE test VALUES (1000, 1000);
SELECT * FROM test;
ALTER ABLE test CHANGE a a DOUBLE;
SELECT * FROM test;

The following is the output:

0: jdbc:hive2://10.17.80.41:10000/default> drop table if exists test;
No rows affected (0.408 seconds)
0: jdbc:hive2://10.17.80.41:10000/default> create table test (a int, b double) stored as parquet;
No rows affected (0.28 seconds)
0: jdbc:hive2://10.17.80.41:10000/default> insert overwrite table test values (1000, 1000);
INFO  : Number of reduce tasks is set to 0 since there's no reduce operator
WARN  : Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
INFO  : number of splits:1
INFO  : Submitting tokens for job: job_1444272676566_0015
INFO  : Kind: HDFS_DELEGATION_TOKEN, Service: 10.17.80.40:8020, Ident: (HDFS_DELEGATION_TOKEN token 78 for hive)
INFO  : The url to track the job: http://host-10-17-80-40.coe.cloudera.com:8088/proxy/application_1444272676566_0015/
INFO  : Starting Job = job_1444272676566_0015, Tracking URL = http://host-10-17-80-40.coe.cloudera.com:8088/proxy/application_1444272676566_0015/
INFO  : Kill Command = /opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/hadoop/bin/hadoop job  -kill job_1444272676566_0015
INFO  : Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
INFO  : 2015-10-08 04:14:07,188 Stage-1 map = 0%,  reduce = 0%
INFO  : 2015-10-08 04:14:17,845 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 3.49 sec
INFO  : MapReduce Total cumulative CPU time: 3 seconds 490 msec
INFO  : Ended Job = job_1444272676566_0015
INFO  : Stage-4 is selected by condition resolver.
INFO  : Stage-3 is filtered out by condition resolver.
INFO  : Stage-5 is filtered out by condition resolver.
INFO  : Moving data to: hdfs://host-10-17-80-40.coe.cloudera.com:8020/user/hive/warehouse/test/.hive-staging_hive_2015-10-08_04-13-54_259_91084841227249311-1/-ext-10000 from 
hdfs://host-10-17-80-40.coe.cloudera.com:8020/user/hive/warehouse/test/.hive-staging_hive_2015-10-08_04-13-54_259_91084841227249311-1/-ext-10002
INFO  : Loading data to table default.test from hdfs://host-10-17-80-40.coe.cloudera.com:8020/user/hive/warehouse/test/.hive-staging_hive_2015-10-08_04-13-54_259_91084841227249311-1/-ext-10000
INFO  : Table default.test stats: [numFiles=1, numRows=1, totalSize=272, rawDataSize=2]
No rows affected (25.223 seconds)
0: jdbc:hive2://10.17.80.41:10000/default> select * from test;
INFO  : Number of reduce tasks is set to 0 since there's no reduce operator
WARN  : Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
INFO  : number of splits:1
INFO  : Submitting tokens for job: job_1444272676566_0016
INFO  : Kind: HDFS_DELEGATION_TOKEN, Service: 10.17.80.40:8020, Ident: (HDFS_DELEGATION_TOKEN token 79 for hive)
INFO  : The url to track the job: http://host-10-17-80-40.coe.cloudera.com:8088/proxy/application_1444272676566_0016/
INFO  : Starting Job = job_1444272676566_0016, Tracking URL = http://host-10-17-80-40.coe.cloudera.com:8088/proxy/application_1444272676566_0016/
INFO  : Kill Command = /opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/hadoop/bin/hadoop job  -kill job_1444272676566_0016
INFO  : Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
INFO  : 2015-10-08 04:14:36,141 Stage-1 map = 0%,  reduce = 0%
INFO  : 2015-10-08 04:14:46,540 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.82 sec
INFO  : MapReduce Total cumulative CPU time: 2 seconds 820 msec
INFO  : Ended Job = job_1444272676566_0016
+-------+---------+--+
|   a   |    b    |
+-------+---------+--+
| 1000  | 1000.0  |
+-------+---------+--+
1 row selected (28.204 seconds)
0: jdbc:hive2://10.17.80.41:10000/default> alter table test change a a double;
No rows affected (0.378 seconds)
0: jdbc:hive2://10.17.80.41:10000/default> select * from test;
INFO  : Number of reduce tasks is set to 0 since there's no reduce operator
WARN  : Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
INFO  : number of splits:1
INFO  : Submitting tokens for job: job_1444272676566_0017
INFO  : Kind: HDFS_DELEGATION_TOKEN, Service: 10.17.80.40:8020, Ident: (HDFS_DELEGATION_TOKEN token 81 for hive)
INFO  : The url to track the job: http://host-10-17-80-40.coe.cloudera.com:8088/proxy/application_1444272676566_0017/
INFO  : Starting Job = job_1444272676566_0017, Tracking URL = http://host-10-17-80-40.coe.cloudera.com:8088/proxy/application_1444272676566_0017/
INFO  : Kill Command = /opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/hadoop/bin/hadoop job  -kill job_1444272676566_0017
INFO  : Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
INFO  : 2015-10-08 04:15:04,794 Stage-1 map = 0%,  reduce = 0%
INFO  : 2015-10-08 04:15:39,006 Stage-1 map = 100%,  reduce = 0%
ERROR : Ended Job = job_1444272676566_0017 with errors
Error: Error while processing statement: FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask (state=08S01,code=2)
0: jdbc:hive2://10.17.80.41:10000/default>

After some research I have found the following JIRAs for this issue: HIVE-6784 and HIVE-12080.

The original issue HIVE-6784 was created a while back, the upstream developer, however, due to concerns about performance issue for the patch, has rejected it and never got fixed.

A newer JIRA was created recently HIVE-12080, which is pretty much the same as HIVE-6784, but requesting to fix the issue without introducing any performance hit.

So, at the time of writing, there is no fixes for this problem yet, the workaround is to re-generate the table by running “INSERT OVERWRITE {table_name} SELECT * FROM {table_name}” so that all data will be updated to the new type.

For the time being, just re-generate the table and you will be good to go.