Impala Auto Update Metadata Support

There are lots of CDH users requested that Impala to support automatic metadata update, so that they do NOT need to run “INVALIDATE METADATA” every time when create table or data are updated through other components, like Hive or Pig.

I would like to share that this has been reported upstream and tracked via JIRA: IMPALA-3124. Currently it is still not fixed and it requires detailed design to make sure that it will work in the proper way.

There is no ETA at this stage on when this feature will be added. Advise anyone interested on this feature to add comments to the JIRA. The more votes, the better chance that it will be prioritized.

Hope above information is helpful.

Impala Reported Corrupt Parquet File After Failed With OutOfMemory Error

Recently I was dealing with an issue that impala reported Corrupt Parquet File after it failed with OutOfMemory error, however, if it does not fail, no corruption will be reported.

See below error message reportd in Impala Daemon logs:

Memory limit exceeded
HdfsParquetScanner::ReadDataPage() failed to allocate 65535 bytes for decompressed data.
Corrupt Parquet file 'hdfs://nameservice1/path/to/file/914164e7120e6076-cdae1be60000001f_169433548_data.0.parq': column 'client_ord_id' had 1024 remaining values but expected 0 _
[Executed: 4/29/2017 5:28:58 AM] [Execution: 588ms]
When an impala query failed with OOM error, it also reported corrupted parquet file:

HdfsParquetScanner::ReadDataPage() failed to allocate 65535 bytes for decompressed data.
Corrupt Parquet file 'hdfs://nameservice1/path/to/file/914164e7120e6076-cdae1be60000001f_169433548_data.0.parq': column 'client_ord_id' had 1024 remaining values but expected 0 _
[Executed: 4/29/2017 5:28:58 AM] [Execution: 588ms]

This is reported in the upstream JIRA: IMPALA-5197, this can happen in the following scenarios:

  • Query failed with OOM error
  • There is a LIMIT clause in the query
  • Query is manually cancelled by the user

Those corrupt messages do not mean the file is really corrupted, it is caused by an Impala bug that mentioned earlier IMPALA-5197.

If it is caused by OutOfMemory error, simply increase the memory limit for the query and try again:

SET MEM_LIMIT=10g;

For the other two causes, we will need to wait for IMPALA-5197 to be fixed

How to Use Beeline to connect to Impala

You can certainly connect to Impala using Hive Driver from beeline, like below command:

beeline -u 'jdbc:hive2://<impala-daemon-host>:21050/default;auth=noSasl'

However, the result output format does not work properly:

> show tables;
customers
dim_prod
mansi
sample_07
sample_08
small
web_logs
+-------+--+
| name  |
+-------+--+
+-------+--+

Notice the output is not inside the columns?

The better approach is to use Cloudera Impala JDBC Driver, please follow the steps below:

1. Download the driver from Cloudera Impala JDBC Driver
2. Extract the files and put it somewhere on the host that you need to run beeline command, in my case is under /root/impala-jdbc/jdbc
3. Run the following command to update the HADOOP_CLASSPATH to include the Impala JDBC Driver JARs

export HADOOP_CLASSPATH=`hadoop classpath`:/root/impala-jdbc/jdbc/*

4. Finally you are ready to start beeline:

beeline -d "com.cloudera.impala.jdbc41.Driver" -u 'jdbc:impala://<impala-daemon-host>:21050;AuthMech=0'

You will need to tell beeline the class name for the driver using “-d” option, in my case the driver class is com.cloudera.impala.jdbc41.Driver

The output looks much better:

> show tables;
+------------+--+
|    name    |
+------------+--+
| customers  |
| dim_prod   |
| mansi      |
| sample_07  |
| sample_08  |
| small      |
| web_logs   |
+------------+--+
7 rows selected (0.219 seconds)

However, the best way is to connect from impala-shell which is designed for Impala natively.

Hope this helps.

Impala query failed with error “IllegalStateException”

This article examples ONE of the possible causes for the issue that Impala query failed with IllegalStateException error.

Recently I was dealing with an Impala issue that when runnnig a simple SELECT query against a table failed with IllegalStateException error:

SELECT * FROM <table_name>;
Query: SELECT * FROM <table_name>
ERROR: IllegalStateException: null

Checking the impala daemon log shows the following stacktrace:

I1114 17:13:04.425281 18380 jni-util.cc:177] java.lang.IllegalStateException
at com.google.common.base.Preconditions.checkState(Preconditions.java:129)
at com.cloudera.impala.analysis.SlotRef.getIdsHelper(SlotRef.java:206)
at com.cloudera.impala.analysis.Expr.getIds(Expr.java:887)
at com.cloudera.impala.analysis.Analyzer.materializeSlots(Analyzer.java:2063)
at com.cloudera.impala.planner.SingleNodePlanner.createSingleNodePlan(SingleNodePlanner.java:131)
at com.cloudera.impala.planner.Planner.createPlan(Planner.java:58)
at com.cloudera.impala.service.Frontend.createExecRequest(Frontend.java:897)
at com.cloudera.impala.service.JniFrontend.createExecRequest(JniFrontend.java:147)

Based on the issues I had seen in the past, most of the cause of the error was due to INVALID_TYPE for the table that Impala was trying to query. Check the DDL of the table in question by running the following query:

SHOW CREATE TABLE <table_name>;

[impala-damon-host:21000] > SHOW CREATE TABLE test;
Query: SHOW CREATE TABLE test
+---------------------------------+
| result                          |
+---------------------------------+
| CREATE EXTERNAL TABLE test (    |
| col1 INVALID_TYPE
.....

You can see that Impala complained about INVALID_TYPE for column “col1”. If you run the same query in beeline, you will get the correct output:

> SHOW CREATE TABLE test;
Query: SHOW CREATE TABLE test
+---------------------------------+
| result                          |
+---------------------------------+
| CREATE EXTERNAL TABLE test (    |
| col1 date
.....

This is because currently Impala still does not support “Date” data type, please refer to the following docs:

Impala Data Types
HiveQL Features not Available in Impala

The only solution for now is to use “Timestamp” data type which is supported by Impala.

How to confirm Dynamic Partition Pruning works in Impala

This article explains how to confirm Impala’s new Dynamic Partition Pruning feature is effective in CDH5.7.x.

Dynamic Partition Pruning is a new feature introduced from CDH5.7.x / Impala 2.5, where information about the partition is collected during run time and impala prunes unnecessary partitions in the ways that were impractical to predict in advance. I have the following test case to show / prove that Dynamic Partition Pruning is working effectively.

1. Create tables with testing data:

CREATE TABLE yy (s string) PARTITIONED BY (year int) STORED AS PARQUET;
INSERT INTO yy PARTITION (year) VALUES ('1999', 1999), ('2000', 2000),
  ('2001', 2001), ('2010',2010);
COMPUTE STATS yy;

CREATE TABLE yy2 (s string) PARTITIONED BY (year int) STORED AS PARQUET;
INSERT INTO yy2 PARTITION (year) VALUES ('1999', 1999), ('2000', 2000),
  ('2001', 2001);
COMPUTE STATS yy2;

It is important to COMPUTE STATS on those tables to make Dynamic Partition Pruning effective

2. Run the following query:

Query: SELECT s FROM yy2 WHERE year IN (SELECT MIN(year) FROM yy LIMIT 1)
+------+
| s    |
+------+
| 1999 |
+------+
Fetched 1 row(s) in 0.55s

The result returned as expected because the minimum value for “year” column in table “yy” is 1999 and it is used in the WHERE condition to limit the result in table “yy2”

3. Check the explain query:

Query: explain SELECT s FROM yy2 WHERE year IN (SELECT MIN(year) FROM yy LIMIT 1)
+----------------------------------------------------------+
| Explain String                                           |
+----------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=16.00MB VCores=1 |
|                                                          |
| 07:EXCHANGE [UNPARTITIONED]                              |
| |                                                        |
| 03:HASH JOIN [LEFT SEMI JOIN, BROADCAST]                 |
| |  hash predicates: year = min(year)                     |
| |  runtime filters: RF000 <- min(year)                   |
| |                                                        |
| |--06:EXCHANGE &#91;BROADCAST&#93;                               |
| |  |                                                     |
| |  05:AGGREGATE &#91;FINALIZE&#93;                               |
| |  |  output: min:merge(year)                            |
| |  |  limit: 1                                           |
| |  |                                                     |
| |  04:EXCHANGE &#91;UNPARTITIONED&#93;                           |
| |  |                                                     |
| |  02:AGGREGATE                                          |
| |  |  output: min(year)                                  |
| |  |                                                     |
| |  01:SCAN HDFS &#91;default.yy&#93;                             |
| |     partitions=4/4 files=4 size=936B                   |
| |                                                        |
| 00:SCAN HDFS &#91;default.yy2&#93;                               |
|    partitions=3/3 files=3 size=702B                      |
|    runtime filters: RF000 -> year                        |
+----------------------------------------------------------+
Fetched 25 row(s) in 0.05s

We can see that table “yy2” was scheduled to be scanned through all 3 partitions 1999, 2000 and 2001, this is expected because at compile time, Impala does not know the result of “SELECT MIN(year) FROM yy LIMIT 1”, so it plans to scan through all partitions in table “yy2”.

4. Let’s check the SUMMARY of the query:

Operator          #Hosts   Avg Time   Max Time  #Rows  Est. #Rows  Peak Mem  Est. Peak Mem  Detail                    
----------------------------------------------------------------------------------------------------------------------
07:EXCHANGE            1    0.000ns    0.000ns      1           1         0        -1.00 B  UNPARTITIONED             
03:HASH JOIN           3  180.999ms  259.000ms      1           1   2.02 MB         5.00 B  LEFT SEMI JOIN, BROADCAST 
|--06:EXCHANGE         3    0.000ns    0.000ns      1           1         0              0  BROADCAST                 
|  05:AGGREGATE        1  215.000ms  215.000ms      1           1  24.00 KB        -1.00 B  FINALIZE                  
|  04:EXCHANGE         1    0.000ns    0.000ns      3           1         0        -1.00 B  UNPARTITIONED             
|  02:AGGREGATE        3  147.333ms  168.000ms      3           1  20.00 KB       10.00 MB                            
|  01:SCAN HDFS        3   28.333ms   39.000ms      4           4  36.00 KB              0  default.yy            
00:SCAN HDFS           3    2.000ms    6.000ms      1           3  46.00 KB       16.00 MB  default.yy2

Check the last row for the SCAN HDFS operator for table “yy2”, we can see that “Est. #Rows” was 3, however, the actual #Rows scanned was only 1. This confirms that the Dynamic Partition Pruning is working, as only one partition/row was scanned instead of 3 partitions/rows

5. Another way to check Dynamic Partition Pruning is effective is to check the Filter in the SCAN operator per node in the query PROFILE. In my test I have 3 hosts.

– Host one:

Filter 0
    - Files processed: 1
    - Files rejected: 0
    - Files total: 1

– Host two:

Filter 0
    - Files processed: 1
    - Files rejected: 1
    - Files total: 1

– Host three:

Filter 0
    - Files processed: 1
    - Files rejected: 1
    - Files total: 1

From above, we can see that each host processed one file each, however, two of them rejected the files due to Dynamic Partition Pruning.