Hive unable to read Snappy files generated by Hive and Flume together

This article explains the workarounds to avoid the Hive query failure when processing snappy files generated by Hive and Flume under the same directory.

The following are the steps to re-produce the issue:

  1. A Hive table (from_hive) with its data injected from Flume
  2. Create another table with same column structure (from_flume)
  3. Insert data into the new table by selecting from old table, this worked and SELECT COUNT(*) returned correct result:
    INSERT INTO from_hive SELECT * FROM from_flume;
    
  4. At this stage SELECT query works on both old and new tables
  5. Copy the data generated by Flume into the new table’s location, so that those files sit under the same table’s location
  6. Do a SELECT * from the table will result in the following error:
    Error: java.io.IOException: java.io.IOException: java.lang.IndexOutOfBoundsException
    at org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderNextException(HiveIOExceptionHandlerChain.java:121)
    at org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil.handleRecordReaderNextException(HiveIOExceptionHandlerUtil.java:77)
    at org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.doNextWithExceptionHandler(HadoopShimsSecure.java:226)
    at org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.next(HadoopShimsSecure.java:136)
    at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:199)
    at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:185)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:52)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
    Caused by: java.io.IOException: java.lang.IndexOutOfBoundsException
    at org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderNextException(HiveIOExceptionHandlerChain.java:121)
    at org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil.handleRecordReaderNextException(HiveIOExceptionHandlerUtil.java:77)
    at org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:355)
    at org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.doNext(CombineHiveRecordReader.java:105)
    at org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.doNext(CombineHiveRecordReader.java:41)
    at org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:116)
    at org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.doNextWithExceptionHandler(HadoopShimsSecure.java:224)
    ... 11 more
    Caused by: java.lang.IndexOutOfBoundsException
    at java.io.DataInputStream.readFully(DataInputStream.java:192)
    at org.apache.hadoop.io.Text.readWithKnownLength(Text.java:319)
    at org.apache.hadoop.io.Text.readFields(Text.java:291)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:71)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
    at org.apache.hadoop.io.SequenceFile$Reader.deserializeValue(SequenceFile.java:2245)
    at org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:2229)
    at org.apache.hadoop.mapred.SequenceFileRecordReader.getCurrentValue(SequenceFileRecordReader.java:109)
    at org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:84)
    at org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:350)
    ... 15 more
    

This is caused by the fact that snappy files generated by Hive and Flume are not compatible. They will work independently under different tables but only fails when putting both of them under the same table or partition.

This is to do with the way that Hive and Flume creates snappy files, they have different headers:

Flume Source:


SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable)org.apache.hadoop.io.compress.SnappyCodec

Select Souce:


SEQ"org.apache.hadoop.io.BytesWritableorg.apache.hadoop.io.Text)org.apache.hadoop.io.compress.SnappyCodec

So when Hive uses CombineHiveInputFormat class (default) to read snappy files, one mapper will read multiple files and if they are being run in the same mapper, due to different structures in the snappy file, Hive will not be able to read them together properly.

The solution is to set hive.input.format to org.apache.hadoop.hive.ql.io.HiveInputFormat to avoid Hive to use CombineHiveInputFormat class to combine multiple snappy files when reading. This will ensure that one mapper will read one file only, but the side effect is that more mappers will be used or files being processed sequentially:


SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

Flume Collector Sink Decorator Plugin Finished

Before Christmas, I posted a blog about my trouble to write a Flume Collector Sink Decorator Plugin.

After doing some research and continue digging the underlining issue, I finally get a solution to this, which makes me super happy.

The issue I had was caused by error:

Exception in thread "main" java.lang.UnsupportedClassVersionError: garbagefilter/GarbageFilterDecorator : Unsupported major.minor version 51.0
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:634)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:186)
at com.cloudera.flume.conf.SourceFactoryImpl.loadPluginBuilders(SourceFactoryImpl.java:160)
at com.cloudera.flume.conf.SourceFactoryImpl.(SourceFactoryImpl.java:126)
at com.cloudera.flume.conf.FlumeBuilder.(FlumeBuilder.java:89)
at com.cloudera.flume.agent.LogicalNodeManager.spawn(LogicalNodeManager.java:75)
at com.cloudera.flume.agent.FlumeNode.setup(FlumeNode.java:529)
at com.cloudera.flume.agent.FlumeNode.main(FlumeNode.java:665)

This is due to the fact that I compiled my plugin under Java 1.7 and tried to run the .jar file under Java 1.6. This error did not appear in the flume log when I ran it as a service using command, which was why I had no clue on what was going on:

$ service flume-node start

It only appeared when I ran it directly on the command line:

$ flume node -n collector1

Not exactly sure why though. Anyway, it is now working, simply follow the steps outlined here.

This is my first Java code in the last 5 years, wow. I think I will spend more time to write more Flume plugins as we need to push more jobs to Flume to do some post processing for us.

Flume Collector Sink Decorator Plugin

In the last few days I have been trying to create a Flume sink decorator plugin to filter out any data that we consider as garbage (It is very disappointing that Flume doesn’t even support a very basic filtering capability). I have followed the steps on R&D Blog with slight modifications to the HelloWorldSinkDecorator Java class. The code compiled but when I plugged it into Flume and enabled it, the collector simply died with no error message what so ever.

I had to stop working on this on Thursday afternoon as I had to join our company’s Xmas party lunch in the city. But with this outstanding task not finished before the break makes me very unhappy. Currently I still can’t find a solution and have to continue digging it when I go back to office next week.

Flume has so many problems since the first day I used it and it is not flexible enough to do lots of simply tasks. I am wondering whether we should be looking for alternatives.

Very interested to see how Flume NG will turn out.

Enable Snappy Compression For Flume

Snappy is a compression/decompression library developed by Google. It aims for very high speeds and reasonable compression ( might be bigger than other standard compression algorithms but faster speed ). Snappy is shipped with Hadoop, unlike LZO compression which is excluded due to licensing issues. To enable Snappy in your Flume installation, following the steps below:

Install on Red Hat systems:

$ sudo yum install hadoop-0.20-native

Install on Ubuntu systems:

$ sudo apt-get install hadoop-0.20-native

This should create a directory under /usr/lib/hadoop/lib/native/ which contains some native hadoop libraries.

Then create environment config for Flume:

$ cp /usr/lib/flume/bin/flume-env.sh.template /usr/lib/flume/bin/flume-env.sh

And update the last line in the file to be:

For 32-bit platform

$ export JAVA_LIBRARY_PATH=/usr/lib/hadoop/lib/native/Linux-i386-32

For 64-bit platform

$ export JAVA_LIBRARY_PATH=/usr/lib/hadoop/lib/native/Linux-amd64-64

Next update the flume’s configuration file under “/etc/flume/conf/flume-site.xml” on the collector node to:

  <property>
    <name>flume.collector.dfs.compress.codec</name>
    <value>SnappyCodec</value>
    <description>Writes formatted data compressed in specified codec to
    dfs. Value is None, GzipCodec, DefaultCodec (deflate), BZip2Codec, SnappyCodec
    or any other Codec Hadoop is aware of </description>
  </property>

And then finally restart the flume-node:

$ /etc/init.d/flume-node restart

You next file update in HDFS will look something like the following:

-rw-r--r--   3 flume supergroup          0 2011-10-21 14:01 /data/traffic/Y2011_M9_W37_D254/R0_P0/C1_20111021-140124175+1100.955183363700204.00000244.snappy.tmp
-rw-r--r--   3 flume supergroup   35156526 2011-10-20 16:51 /data/traffic/Y2011_M9_W37_D254/R0_P0/C2_20111020-164928958+1100.780424004236302.00000018.snappy
-rw-r--r--   3 flume supergroup     830565 2011-10-20 17:15 /data/traffic/Y2011_M9_W37_D254/R0_P0/C2_20111020-171423368+1100.781918413572302.00000018.snappy
-rw-r--r--   3 flume supergroup          0 2011-10-20 17:19 /data/traffic/Y2011_M9_W37_D254/R0_P0/C2_20111020-171853599+1100.782188644505302.00000042.snappy.tmp
-rw-r--r--   3 flume supergroup    1261171 2011-10-20 17:37 /data/traffic/Y2011_M9_W37_D254/R0_P0/C2_20111020-173728225+1100.783303271088302.00000018.snappy
-rw-r--r--   3 flume supergroup    2128701 2011-10-20 17:40 /data/traffic/Y2011_M9_W37_D254/R0_P0/C2_20111020-174024045+1100.783479090669302.00000046.snappy

Happy Fluming..

Compile Hadoop LZO Compression Library on CentOS

To compile and install Hadoop’s LZO compression library on CentOS, following the steps below:

Download hadoop LZO source from Kevin’s Hadoop LZO Project.

If you are using Ant version of < 1.7, please download latest ant binary pacakge from Apache Ant, otherwise you will get the following error when compiling:

BUILD FAILED
/root/kevinweil-hadoop-lzo-4c5a227/build.xml:510: Class org.apache.tools.ant.taskdefs.ConditionTask doesn't support the nested "typefound" element.

Install lzo-devel:

yum install lzo-devel.x86_64
yum install lzop.x86_64

Unzip Apache Ant and Hadoop LZO Compression Library to somewhere you have access to.

unzip kevinweil-hadoop-lzo-4c5a227.zip
cd kevinweil-hadoop-lzo-4c5a227

Do the following if your ant version is less than 1.7:

export ANT_HOME=<path_to_ant_downloaded_dir>

Then

<path_to_ant> compile-native tar

In my case is

~/apache-ant-1.8.2/bin/ant compile-native tar

Carefully check the compiler output, if you see errors like this:

     [exec] checking for strerror_r... yes
     [exec] checking whether strerror_r returns char *... yes
     [exec] checking for mkdir... yes
     [exec] checking for uname... yes
     [exec] checking for memset... yes
     [exec] checking for JNI_GetCreatedJavaVMs in -ljvm... no
     [exec] checking jni.h usability... 
     [exec] configure: error: Native java headers not found. Is $JAVA_HOME set correctly?
     [exec] no
     [exec] checking jni.h presence... no
     [exec] checking for jni.h... no

BUILD FAILED
/..../kevinweil-hadoop-lzo-4c5a227/build.xml:247: exec returned: 1

Then you will need to find your java path and update build.xml file and add “JAVA_HOME” setting on line 247, in my case is “/usr/lib/jvm/java/”

<exec dir="${build.native}" executable="sh" failonerror="true">
   <env key="OS_NAME" value="${os.name}"/>
   <env key="JAVA_HOME" value="/usr/lib/jvm/java/" />
   <env key="OS_ARCH" value="${os.arch}"/>
   <env key="JVM_DATA_MODEL" value="${sun.arch.data.model}"/>
   <env key="NATIVE_SRCDIR" value="${native.src.dir}"/>
   <arg line="${native.src.dir}/configure"/>
</exec>

and re-run the compiler again

<path_to_ant> compile-native tar

If everything goes well, you should get “BUILD SUCCESSFUL” message at the end of the compile process.

Now do

ls -al build

in the current directory and you will see the following files generated:

drwxr-xr-x 9 root root    4096 Oct 19 17:21 .
drwxr-xr-x 6 root root    4096 Oct 19 17:21 ..
drwxr-xr-x 4 root root    4096 Oct 19 17:21 classes
drwxr-xr-x 3 root root    4096 Oct 19 17:21 docs
drwxr-xr-x 6 root root    4096 Oct 19 17:21 hadoop-lzo-0.4.14
-rw-r--r-- 1 root root   62239 Oct 19 17:21 hadoop-lzo-0.4.14.jar
-rw-r--r-- 1 root root 1824851 Oct 19 17:21 hadoop-lzo-0.4.14.tar.gz
drwxr-xr-x 5 root root    4096 Oct 19 16:59 ivy
drwxr-xr-x 3 root root    4096 Oct 19 17:12 native
drwxr-xr-x 2 root root    4096 Oct 19 17:12 src
drwxr-xr-x 3 root root    4096 Oct 19 17:12 test

The most important one is hadoop-lzo-0.4.14.jar which can be copied to hadoop’s library directory and ready to be used.