Hive on Spark query failed with ConnectTimeoutException

Recently I have been dealing with an issue that Hive on Spark job intermittently failed with ConnectionTimeouException. The connection timed out when the ApplicationMaster is trying to communicate back to HiveServer2 on a random port and failed immediately after 2 seconds of trying to connect. See below stack trace for details:

17/05/03 03:20:06 INFO yarn.ApplicationMaster: Waiting for spark context initialization
17/05/03 03:20:06 INFO yarn.ApplicationMaster: Waiting for spark context initialization ...
17/05/03 03:20:06 INFO client.RemoteDriver: Connecting to: <hs2-host>:35915
17/05/03 03:20:08 ERROR yarn.ApplicationMaster: User class threw exception: java.util.concurrent.ExecutionException: 
io.netty.channel.ConnectTimeoutException: connection timed out: <hs2-host>/172.19.22.11:35915 
java.util.concurrent.ExecutionException: io.netty.channel.ConnectTimeoutException: connection timed out: <hs2-host>/172.19.22.11:35915 
at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:37) 
at org.apache.hive.spark.client.RemoteDriver.<init>(RemoteDriver.java:156) 
at org.apache.hive.spark.client.RemoteDriver.main(RemoteDriver.java:556) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) 
Caused by: io.netty.channel.ConnectTimeoutException: connection timed out: <hs2-host>/172.19.22.11:35915 
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:220) 
at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) 
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120) 
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) 
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
at java.lang.Thread.run(Thread.java:745) 
17/05/03 03:20:08 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.util.concurrent.ExecutionException: 
io.netty.channel.ConnectTimeoutException: connection timed out: <hs2-host>/172.19.22.11:35915) 
17/05/03 03:20:16 ERROR yarn.ApplicationMaster: SparkContext did not initialize after waiting for 100000 ms. Please check earlier log output for errors. Failing the application. 
17/05/03 03:20:16 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: java.util.concurrent.ExecutionException: 
io.netty.channel.ConnectTimeoutException: connection timed out: <hs2-host>/172.19.22.11:35915) 
17/05/03 03:20:16 INFO yarn.ApplicationMaster: Deleting staging directory .sparkStaging/application_1492040605432_11445 
17/05/03 03:20:16 INFO util.ShutdownHookManager: Shutdown hook called

We can see from above log that the timeout happened 2 seconds after the attempted connection, which does not make too much sense that the timeout value is such short period.

After digging further into code, it turned out that this timeout is controlled by hive’s setting called hive.spark.client.connect.timeout. The default value for this setting is 1000ms, which is only 1 second, which explained the cause.

This issue only happens when cluster is on high load and HiveServer2 is not able to respond back to ApplicationMaster within 1 second and then connection will timeout.

To by pass this issue, we can simply increase this timeout value to, say, 5 seconds:

SET hive.spark.client.connect.timeout=5000;

# Your query here

I have reported such issue upstream in JIRA: HIVE-16794, and I will submit a patch to increase this timeout setting soon.

Alternative Timestamp Support in Hive (ISO-8601)

Hive does not support for ISO-8601 timestamp format, like this “2017-02-16T11:24:29.000Z” by default.

Check the following test case:

1. Create a file with the following content:

2017-02-16T11:24:29.000Z
2017-02-16 11:24:29

2. Put the file in HDFS:

hadoop fs -put test.txt /tmp/test/data

3. Create an external table links to it:

CREATE EXTERNAL TABLE ts_test (a timestamp) ROW FORMAT DELIMITED FIELDS TERMINATED by ',' LOCATION '/tmp/test/data';

4. When you select the table, first record will be NULL:

+------------------------+--+
|       ts_test.a        |
+------------------------+--+
| NULL                   |
| 2017-02-16 11:24:29.0  |
+------------------------+--+

This is due to Hive not able to recognise timestamp format of “2017-02-16T11:24:29.000Z”.

As of CDH5.7.x or Hive 1.2, Hive supports reading alternative timestamp formats, see HIVE-9298

To make it work, run the following Hive query:

ALTER TABLE ts_test SET SERDEPROPERTIES ("timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss.SSSZ");

Then data can be read correctly by Hive:

+------------------------+--+
|       ts_test.a        |
+------------------------+--+
| 2017-02-16 03:24:29.0  |
| 2017-02-16 11:24:29.0  |
+------------------------+--+

The different values is due to timezone conversion (Z is for UTC). Hive treats “2017-02-16T11:24:29.000Z” as UTC and then converts it to server’s local time, in the case of second value of “2017-02-16 11:24:29”, no conversion is done so original value is returned.

Hope this helps.

How to enable HiveServer2 audit log through Cloudera Manager

This article explains the steps required to enable audit log for HiveServer2, so that all queries run through HiveServer2 will be audited into a central log file.

Please follow the steps below:

  1. Go to Cloudera Manager home page > Hive > Configuration
  2. Tick “Enable Audit Collection”
  3. Ensure “Audit Log Directory” location point to a path that has enough disk space
  4. Go to Cloudera Manager home page > click on “Cloudera Management Service” > Instances
  5. Click on “Add Role Instances” button on the top right corner of the page
  6. Choose a host for Navigator Audit Server & Navigator Metadata Server
  7. Then follow on screen instructions to finish adding the new roles
  8. Once the roles are added successfully, Cloudera Manager will ask you to restart a few services, including Hive
  9. Go ahead and restart Hive

After restarting, Hive’s audit log will be enabled and logged into /var/log/hive/audit directory by default.

Please note that you are not required start Navigator services, so if you don’t need them running, you can just leave them at STOP state, the Hive’s audit logs should still function as normal. However, it is a requirement to have Navigator installed for the audit log to function properly, as there are some libraries from Navigator are required for audit to work.

Beeline Failed To Start With OOM Error When Calling getConsoleReader Method

If you get the following error when trying to start up beeline from command line:

Exception in thread "main" java.lang.OutOfMemoryError: Requested array size exceeds VM limit 
at java.util.Arrays.copyOf(Arrays.java:2271) 
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) 
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) 
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:122) 
at org.apache.hive.beeline.BeeLine.getConsoleReader(BeeLine.java:854) 
at org.apache.hive.beeline.BeeLine.begin(BeeLine.java:766) 
at org.apache.hive.beeline.BeeLine.mainWithInputRedirection(BeeLine.java:480) 
at org.apache.hive.beeline.BeeLine.main(BeeLine.java:463) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at org.apache.hadoop.util.RunJar.run(RunJar.java:221) 
at org.apache.hadoop.util.RunJar.main(RunJar.java:136) 

Based on the stacktrace, we can see that Beeline was at startup phase and was trying to initialize through getConsoleReader method, which will read data from beeline’s history file:

    try {
      // now load in the previous history
      if (hist != null) {
        History h = consoleReader.getHistory();
        if (h instanceof FileHistory) {
          ((FileHistory) consoleReader.getHistory()).load(new ByteArrayInputStream(hist
              .toByteArray()));
        } else {
          consoleReader.getHistory().add(hist.toString());
        }
      }
    } catch (Exception e) {
        handleException(e);
    }

By default, the history file is located under ~/.beeline/history and beeline will load the latest 500 rows into memory. If those queries are super big, containing lots of characters, it is possible that the history file size will reach as big as a few GBs. When beeline is trying to load such big history file into memory, it will eventually fail with OutOfMemory error.

I have reported such issue in the Hive upstream JIRA: HIVE-15166, and I am in the middle of submitting a patch for it.

For the time being, the best way is to remove the ~/.beeline/history file before you fire up beeline.

Hive unable to read Snappy files generated by Hive and Flume together

This article explains the workarounds to avoid the Hive query failure when processing snappy files generated by Hive and Flume under the same directory.

The following are the steps to re-produce the issue:

  1. A Hive table (from_hive) with its data injected from Flume
  2. Create another table with same column structure (from_flume)
  3. Insert data into the new table by selecting from old table, this worked and SELECT COUNT(*) returned correct result:
    INSERT INTO from_hive SELECT * FROM from_flume;
    
  4. At this stage SELECT query works on both old and new tables
  5. Copy the data generated by Flume into the new table’s location, so that those files sit under the same table’s location
  6. Do a SELECT * from the table will result in the following error:
    Error: java.io.IOException: java.io.IOException: java.lang.IndexOutOfBoundsException
    at org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderNextException(HiveIOExceptionHandlerChain.java:121)
    at org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil.handleRecordReaderNextException(HiveIOExceptionHandlerUtil.java:77)
    at org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.doNextWithExceptionHandler(HadoopShimsSecure.java:226)
    at org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.next(HadoopShimsSecure.java:136)
    at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:199)
    at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:185)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:52)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
    Caused by: java.io.IOException: java.lang.IndexOutOfBoundsException
    at org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderNextException(HiveIOExceptionHandlerChain.java:121)
    at org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil.handleRecordReaderNextException(HiveIOExceptionHandlerUtil.java:77)
    at org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:355)
    at org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.doNext(CombineHiveRecordReader.java:105)
    at org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.doNext(CombineHiveRecordReader.java:41)
    at org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:116)
    at org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.doNextWithExceptionHandler(HadoopShimsSecure.java:224)
    ... 11 more
    Caused by: java.lang.IndexOutOfBoundsException
    at java.io.DataInputStream.readFully(DataInputStream.java:192)
    at org.apache.hadoop.io.Text.readWithKnownLength(Text.java:319)
    at org.apache.hadoop.io.Text.readFields(Text.java:291)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:71)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
    at org.apache.hadoop.io.SequenceFile$Reader.deserializeValue(SequenceFile.java:2245)
    at org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:2229)
    at org.apache.hadoop.mapred.SequenceFileRecordReader.getCurrentValue(SequenceFileRecordReader.java:109)
    at org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:84)
    at org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:350)
    ... 15 more
    

This is caused by the fact that snappy files generated by Hive and Flume are not compatible. They will work independently under different tables but only fails when putting both of them under the same table or partition.

This is to do with the way that Hive and Flume creates snappy files, they have different headers:

Flume Source:


SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable)org.apache.hadoop.io.compress.SnappyCodec

Select Souce:


SEQ"org.apache.hadoop.io.BytesWritableorg.apache.hadoop.io.Text)org.apache.hadoop.io.compress.SnappyCodec

So when Hive uses CombineHiveInputFormat class (default) to read snappy files, one mapper will read multiple files and if they are being run in the same mapper, due to different structures in the snappy file, Hive will not be able to read them together properly.

The solution is to set hive.input.format to org.apache.hadoop.hive.ql.io.HiveInputFormat to avoid Hive to use CombineHiveInputFormat class to combine multiple snappy files when reading. This will ensure that one mapper will read one file only, but the side effect is that more mappers will be used or files being processed sequentially:


SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;