Impala query failed with error: “Incompatible Parquet Schema”

Yesterday, I was dealing with an issue that when running a very simple Impala SELECT query, it failed with “Incompatible Parquet schema” error. I have confirmed the following workflow that triggered the error:

  1. Parquet file is created from external library
  2. Load the parquet file into Hive/Impala table
  3. Query the table through Impala will fail with below error message
    incompatible Parquet schema for column 'db_name.tbl_name.col_name'. 
    Column type: DECIMAL(19, 0), Parquet schema:\noptional byte_array col_name [i:2 d:1 r:0]
  4. The same query works well in Hive

This is due to impala currently does not support all decimal specs that are supported by Parquet. Currently Parquet supports the following specs:

  • int32: for 1 <= precision <= 9
  • int64: for 1 <= precision <= 18; precision < 10 will produce a warning
  • fixed_len_byte_array: precision is limited by the array size. Length n can store <= floor(log_10(2^(8*n - 1) - 1)) base-10 digits
  • binaryprecision is not limited, but is required. The minimum number of bytes to store the unscaled value should be used.

Please refer to Parquet Logical Type Definitions page for details.

However, Impala only supports fixed_len_byte_array, but no others. This has been reported in the upstream JIRA: IMPALA-2494

The only workaround for now is to create a parquet file that will use supported specs for Decimal column, or simply create parquet file through either Hive or Impala.

Impala Reported Corrupt Parquet File After Failed With OutOfMemory Error

Recently I was dealing with an issue that impala reported Corrupt Parquet File after it failed with OutOfMemory error, however, if it does not fail, no corruption will be reported.

See below error message reportd in Impala Daemon logs:

Memory limit exceeded
HdfsParquetScanner::ReadDataPage() failed to allocate 65535 bytes for decompressed data.
Corrupt Parquet file 'hdfs://nameservice1/path/to/file/914164e7120e6076-cdae1be60000001f_169433548_data.0.parq': column 'client_ord_id' had 1024 remaining values but expected 0 _
[Executed: 4/29/2017 5:28:58 AM] [Execution: 588ms]
When an impala query failed with OOM error, it also reported corrupted parquet file:

HdfsParquetScanner::ReadDataPage() failed to allocate 65535 bytes for decompressed data.
Corrupt Parquet file 'hdfs://nameservice1/path/to/file/914164e7120e6076-cdae1be60000001f_169433548_data.0.parq': column 'client_ord_id' had 1024 remaining values but expected 0 _
[Executed: 4/29/2017 5:28:58 AM] [Execution: 588ms]

This is reported in the upstream JIRA: IMPALA-5197, this can happen in the following scenarios:

  • Query failed with OOM error
  • There is a LIMIT clause in the query
  • Query is manually cancelled by the user

Those corrupt messages do not mean the file is really corrupted, it is caused by an Impala bug that mentioned earlier IMPALA-5197.

If it is caused by OutOfMemory error, simply increase the memory limit for the query and try again:

SET MEM_LIMIT=10g;

For the other two causes, we will need to wait for IMPALA-5197 to be fixed

How to redirect parquet’s log message into STDERR rather than STDOUT

This article explains the steps needed to redirect parquet’s log message from STDOUT to STDERR, so that the output of Hive result will not be polluted should the user wants to capture the query result on command line.

In Parquet’s code based, it writes its logging information directly into STDOUT, this will cause some applications to fail because those messages will be captured, see example below:

1. Table with TEXT file format works as below:

$ test=`hive -e "SELECT * FROM default.test"` 

$ echo $test 
2 5 4 3 2 1 5 4 3 2

2. However, if you do the same thing for Parquet table, the result is different:

$ test_parquet=`hive -e "SELECT * FROM default.test_parquet"` 

$ echo $test_parquet 
2 5 4 3 2 1 5 4 3 2 16/08/2016 5:55:32 PM WARNING: parquet.hadoop.ParquetRecordReader: 
Can not initialize counter due to context is not a instance of TaskInputOutputContext, 
but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 
16/08/2016 5:55:32 PM INFO: parquet.hadoop.InternalParquetRecordReader: 
RecordReader initialized will read a total of 10 records. 16/08/2016 5:55:32 PM 
INFO: parquet.hadoop.InternalParquetRecordReader: at row 0. reading next block 16/08/2016 5:55:32 PM 
INFO: parquet.hadoop.InternalParquetRecordReader: block read in memory in 15 ms. row count = 10

So if an application tries to use the variable $test_parquet, it will cause issues due to those WARNING messages.

This problem has been reported in upstream JIRA: HIVE-13954, however, at the time of writing (CDH5.8.1), this JIRA has not been backported into CDH yet.

To workaround the problem, follow the steps below:

  1. Save the content of the following to a file:
    #===============
    parquet.handlers= java.util.logging.ConsoleHandler
    .level=INFO
    java.util.logging.ConsoleHandler.level=INFO
    java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter
    java.util.logging.SimpleFormatter.format=[%1$tc] %4$s: %2$s - %5$s %6$s%n
    #===============
    

    and put it anywhere you like on the client machine that you will run Hive CLI, in my test I put it under /tmp/parquet/parquet-logging2.properties

  2. run the following command on shell before you run Hive CLI:
    export HADOOP_CLIENT_OPTS="-Djava.util.logging.config.file=/tmp/parquet/parquet-logging2.properties"
    

    please change the path to the properties file accordingly

  3. run your Hive CLI command:

    test_parquet=`hive -e "SELECT * FROM default.test_parquet"`
    

    the output will be saved in “$test_parquet” as expected

Note: Please be advised that Hive CLI is now deprecated, we strongly advise that you connect to HS2 through JDBC or ODBC driver to get proper results, we do not recommend to parse result from the Hive CLI output.

Note: Please also advise that this workaround will only work in CDH version of Hive, as the upstream version has different package names as the one in CDH for Parquet class.

Unable to query Hive parquet table after altering column type

Currently Hive does not support changing column types for parquet tables, due to performance issues. I have developed the following test case to prove the bug:

DROP TABLE IF EXISTS test;
CREATE TABLE test (a INT, b DOUBLE) STORED AS PARQUET;
INSERT OVERWRITE TABLE test VALUES (1000, 1000);
SELECT * FROM test;
ALTER ABLE test CHANGE a a DOUBLE;
SELECT * FROM test;

The following is the output:

0: jdbc:hive2://10.17.80.41:10000/default> drop table if exists test;
No rows affected (0.408 seconds)
0: jdbc:hive2://10.17.80.41:10000/default> create table test (a int, b double) stored as parquet;
No rows affected (0.28 seconds)
0: jdbc:hive2://10.17.80.41:10000/default> insert overwrite table test values (1000, 1000);
INFO  : Number of reduce tasks is set to 0 since there's no reduce operator
WARN  : Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
INFO  : number of splits:1
INFO  : Submitting tokens for job: job_1444272676566_0015
INFO  : Kind: HDFS_DELEGATION_TOKEN, Service: 10.17.80.40:8020, Ident: (HDFS_DELEGATION_TOKEN token 78 for hive)
INFO  : The url to track the job: http://host-10-17-80-40.coe.cloudera.com:8088/proxy/application_1444272676566_0015/
INFO  : Starting Job = job_1444272676566_0015, Tracking URL = http://host-10-17-80-40.coe.cloudera.com:8088/proxy/application_1444272676566_0015/
INFO  : Kill Command = /opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/hadoop/bin/hadoop job  -kill job_1444272676566_0015
INFO  : Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
INFO  : 2015-10-08 04:14:07,188 Stage-1 map = 0%,  reduce = 0%
INFO  : 2015-10-08 04:14:17,845 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 3.49 sec
INFO  : MapReduce Total cumulative CPU time: 3 seconds 490 msec
INFO  : Ended Job = job_1444272676566_0015
INFO  : Stage-4 is selected by condition resolver.
INFO  : Stage-3 is filtered out by condition resolver.
INFO  : Stage-5 is filtered out by condition resolver.
INFO  : Moving data to: hdfs://host-10-17-80-40.coe.cloudera.com:8020/user/hive/warehouse/test/.hive-staging_hive_2015-10-08_04-13-54_259_91084841227249311-1/-ext-10000 from 
hdfs://host-10-17-80-40.coe.cloudera.com:8020/user/hive/warehouse/test/.hive-staging_hive_2015-10-08_04-13-54_259_91084841227249311-1/-ext-10002
INFO  : Loading data to table default.test from hdfs://host-10-17-80-40.coe.cloudera.com:8020/user/hive/warehouse/test/.hive-staging_hive_2015-10-08_04-13-54_259_91084841227249311-1/-ext-10000
INFO  : Table default.test stats: [numFiles=1, numRows=1, totalSize=272, rawDataSize=2]
No rows affected (25.223 seconds)
0: jdbc:hive2://10.17.80.41:10000/default> select * from test;
INFO  : Number of reduce tasks is set to 0 since there's no reduce operator
WARN  : Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
INFO  : number of splits:1
INFO  : Submitting tokens for job: job_1444272676566_0016
INFO  : Kind: HDFS_DELEGATION_TOKEN, Service: 10.17.80.40:8020, Ident: (HDFS_DELEGATION_TOKEN token 79 for hive)
INFO  : The url to track the job: http://host-10-17-80-40.coe.cloudera.com:8088/proxy/application_1444272676566_0016/
INFO  : Starting Job = job_1444272676566_0016, Tracking URL = http://host-10-17-80-40.coe.cloudera.com:8088/proxy/application_1444272676566_0016/
INFO  : Kill Command = /opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/hadoop/bin/hadoop job  -kill job_1444272676566_0016
INFO  : Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
INFO  : 2015-10-08 04:14:36,141 Stage-1 map = 0%,  reduce = 0%
INFO  : 2015-10-08 04:14:46,540 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.82 sec
INFO  : MapReduce Total cumulative CPU time: 2 seconds 820 msec
INFO  : Ended Job = job_1444272676566_0016
+-------+---------+--+
|   a   |    b    |
+-------+---------+--+
| 1000  | 1000.0  |
+-------+---------+--+
1 row selected (28.204 seconds)
0: jdbc:hive2://10.17.80.41:10000/default> alter table test change a a double;
No rows affected (0.378 seconds)
0: jdbc:hive2://10.17.80.41:10000/default> select * from test;
INFO  : Number of reduce tasks is set to 0 since there's no reduce operator
WARN  : Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
INFO  : number of splits:1
INFO  : Submitting tokens for job: job_1444272676566_0017
INFO  : Kind: HDFS_DELEGATION_TOKEN, Service: 10.17.80.40:8020, Ident: (HDFS_DELEGATION_TOKEN token 81 for hive)
INFO  : The url to track the job: http://host-10-17-80-40.coe.cloudera.com:8088/proxy/application_1444272676566_0017/
INFO  : Starting Job = job_1444272676566_0017, Tracking URL = http://host-10-17-80-40.coe.cloudera.com:8088/proxy/application_1444272676566_0017/
INFO  : Kill Command = /opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/hadoop/bin/hadoop job  -kill job_1444272676566_0017
INFO  : Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
INFO  : 2015-10-08 04:15:04,794 Stage-1 map = 0%,  reduce = 0%
INFO  : 2015-10-08 04:15:39,006 Stage-1 map = 100%,  reduce = 0%
ERROR : Ended Job = job_1444272676566_0017 with errors
Error: Error while processing statement: FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask (state=08S01,code=2)
0: jdbc:hive2://10.17.80.41:10000/default>

After some research I have found the following JIRAs for this issue: HIVE-6784 and HIVE-12080.

The original issue HIVE-6784 was created a while back, the upstream developer, however, due to concerns about performance issue for the patch, has rejected it and never got fixed.

A newer JIRA was created recently HIVE-12080, which is pretty much the same as HIVE-6784, but requesting to fix the issue without introducing any performance hit.

So, at the time of writing, there is no fixes for this problem yet, the workaround is to re-generate the table by running “INSERT OVERWRITE {table_name} SELECT * FROM {table_name}” so that all data will be updated to the new type.

For the time being, just re-generate the table and you will be good to go.

Unable to read Parquet files with same schema and different flags in Pig

Today, I found a bug in Pig that does not allow user to run Pig to read a table which contains multiple files that have the same schema with different flags. See example below:

message example {
    required binary file_name (UTF8);
    required binary date_time (UTF8);
    required binary tail (UTF8);
    required binary event (UTF8);
    required binary value (UTF8);
    optional int32 record_number;
    required binary src (UTF8);
}

message schema {
    optional binary file_name;
    optional binary date_time;
    optional binary tail;
    optional binary event;
    optional binary value;
    optional int32 record_number;
    optional binary src;
} 

If you run it in Pig, the following error will be returned:

[main] ERROR org.apache.pig.PigServer - exception during parsing: Error during parsing. 
repetition constraint is more restrictive: can not merge type required binary file_name (UTF8) into optional binary file_name
Failed to parse: repetition constraint is more restrictive: can not merge type required binary file_name (UTF8) 
into optional binary file_name
at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:198)
at org.apache.pig.PigServer$Graph.parseQuery(PigServer.java:1676)
at org.apache.pig.PigServer$Graph.access$000(PigServer.java:1409)
at org.apache.pig.PigServer.parseAndBuild(PigServer.java:342)
at org.apache.pig.PigServer.executeBatch(PigServer.java:367)
at org.apache.pig.PigServer.executeBatch(PigServer.java:353)
at org.apache.pig.tools.grunt.GruntParser.executeBatch(GruntParser.java:140)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:202)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:173)
at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:84)
at org.apache.pig.Main.run(Main.java:478)
at org.apache.pig.PigRunner.run(PigRunner.java:49)
at org.apache.oozie.action.hadoop.PigMain.runPigJob(PigMain.java:286)
at org.apache.oozie.action.hadoop.PigMain.run(PigMain.java:226)
at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:39)
at org.apache.oozie.action.hadoop.PigMain.main(PigMain.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:227)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:450)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

At the time of writing, this issue affects both CDH5.3.x and CDH5.4.x, and it is reported in PARQUET-138, but still not fixed.

I have also found another issue PARQUET-139, which is fixed from CDH5.4.0 onwards, provides us a workaround to fix the problem we have here.

To fix this issue, we need to upgrade CDH to 5.4.x, and then update the Pig script from:

data = LOAD '$path_to_source'
USING parquet.pig.ParquetLoader as(
    file_name:bytearray,
    date_time:bytearray,
    tail:bytearray,
    event:bytearray,
    value:bytearray,
    record_number:int,
    src:bytearray
);

to:

data = LOAD '$path_to_source'
USING parquet.pig.ParquetLoader(
    'file_name:bytearray,date_time:bytearray,tail:bytearray,event:bytearray,value:bytearray,record_number:int,src:bytearray'
);

So instead of passing each column definitions as one parameter to AS(), we pass all columns as a single string to ParquetLoader’s constructor. After this change, the problem should be fixed.

Hope this helps.