Enabling Snappy Compression Support in Hadoop 2.4 under CentOS 6.3

After Hadoop is install manually using binary package on CentOS, Snappy compression is not supported by default and there are extra steps required in order for Snappy to work in Hadoop. It is straightforward but might not be obvious if you don’t know what to do.

Firstly, if you are using 64 bit version of CentOS, you will need to replace the default native hadoop library which is shipped with Hadoop (it is only compiled for 32 bit), you can try to download it from here, and then put it under “$HADOOP_HOME/lib/native” directory. If there is a symlink, you can just remove the symlink with the actual file. If it still doesn’t work, then you might need to compile yourself on your machine which is out of scope of this post, you can follow instructions on this site.

Secondly you will need to install native snappy library for your operating system (CentOS 6.3 in my case):

$ sudo yum install snappy snappy-devel

This will create a file called libsnappy.so under /usr/lib64 directory, we need to create a link to this file under “$HADOOP_HOME/lib/native”

sudo ln -s /usr/lib64/libsnappy.so $HADOOP_HOME/lib/native/libsnappy.so

Then update three configuration files:





And finally add the following line into $HADOOP_HOME/etc/hadoop/hadoop-env.sh to tell Hadoop to load the native library from the exact location:

export JAVA_LIBRARY_PATH="/usr/local/hadoop/lib/native"

That’s it, just restart HDFS and Yarn by running:


Now you should be able to create hive tables with Snappy compressed.

Getting “protobuf” Error When Using Shark 0.9.1 with Hadoop 2.4

Recently I need to experiment with Shark so that I can compare its performance with Hive at work. In order to do this, I need to setup a single cluster on one of my virtualbox machines.

Installing Hadoop and Hive was quite smooth, you can refer back to my post regarding the instructions. However, I encountered problems when trying to install Shark and Spark.

Spark runs OK, no problems, but Shark kept failing when I tried to run some queries:

shark> select * from test;
17.096: [Full GC 71198K->24382K(506816K), 0.3150970 secs]
Exception in thread "main" java.lang.VerifyError: class org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$SetOwnerRequestProto overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet;
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	at java.lang.Class.getDeclaredMethods0(Native Method)
	at java.lang.Class.privateGetDeclaredMethods(Class.java:2531)
	at java.lang.Class.privateGetPublicMethods(Class.java:2651)
	at java.lang.Class.privateGetPublicMethods(Class.java:2661)
	at java.lang.Class.getMethods(Class.java:1467)
	at sun.misc.ProxyGenerator.generateClassFile(ProxyGenerator.java:426)
	at sun.misc.ProxyGenerator.generateProxyClass(ProxyGenerator.java:323)
	at java.lang.reflect.Proxy.getProxyClass0(Proxy.java:636)
	at java.lang.reflect.Proxy.newProxyInstance(Proxy.java:722)
	at org.apache.hadoop.ipc.ProtobufRpcEngine.getProxy(ProtobufRpcEngine.java:92)
	at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:537)
	at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:334)
	at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:241)
	at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:141)
	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:576)
	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:521)
	at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:146)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
	at org.apache.hadoop.hive.ql.Context.getScratchDir(Context.java:180)
	at org.apache.hadoop.hive.ql.Context.getMRScratchDir(Context.java:231)
	at org.apache.hadoop.hive.ql.Context.getMRTmpFileURI(Context.java:288)
	at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.getMetaData(SemanticAnalyzer.java:1274)
	at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.getMetaData(SemanticAnalyzer.java:1059)
	at shark.parse.SharkSemanticAnalyzer.analyzeInternal(SharkSemanticAnalyzer.scala:137)
	at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:279)
	at shark.SharkDriver.compile(SharkDriver.scala:215)
	at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:337)
	at org.apache.hadoop.hive.ql.Driver.run(Driver.java:909)
	at shark.SharkCliDriver.processCmd(SharkCliDriver.scala:338)
	at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:413)
	at shark.SharkCliDriver$.main(SharkCliDriver.scala:235)
	at shark.SharkCliDriver.main(SharkCliDriver.scala)

It is not obvious what’s happening, but it was caused by the package called “protobuf”. It can be found in jar file “hive-exec-0.11.0-shark-0.9.1.jar” which is located at $SHARK_HOME/lib_managed/jars/edu.berkeley.cs.shark/hive-exec, all you need to do is to remove the package completely:

cd $SHARK_HOME/lib_managed/jars/edu.berkeley.cs.shark/hive-exec
unzip hive-exec-0.11.0-shark-0.9.1.jar
rm -f com/google/protobuf/*
zip -r hive-exec-0.11.0-shark-0.9.1.jar *
rm -fr com hive-exec-log4j.properties javaewah/ javax/ javolution/ META-INF/ org/

That’s it, restart your “shark” and it should be working now.

Installing Single Node Hadoop 2.4 on CentOS 6.3

There are lots of options you can go to install Hadoop, and Cloudera is one of the easiest. However, if you just want to experience Hadoop on a virtual machine, having Cloudera on top will eat lots of your memory. It would be much better to just install vanilla Hadoop from its official website.

This post will focus on getting Hadoop running on CentOS without any third party software.

Configure “hadoop”user

The following steps assumes that you logged in as “root” user.

[root@hadoop-spark-vm root]$ adduser hadoop

This should add a new user with username “hadoop” and a group called “hadoop”

Update “hadopp”‘s password:

[root@hadoop-spark-vm root]$ passwd hadoop

Give “hadoop” user “sudo” access, open ‘/etc/sudoers’ and add the following line:

hadoop ALL=(ALL) ALL

And now login as “hadoop” user:

[root@hadoop-spark-vm root]$ su - hadoop

Set up the “passwordless” SSH

Run the following commands:

[hadoop@hadoop-spark-vm hadoop]$ ssh-keygen -t rsa -P ''
[hadoop@hadoop-spark-vm hadoop]$ sudo cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

Install JAVA

[hadoop@hadoop-spark-vm hadoop]$ sudo yum install java-1.7.0-openjdk*

After the installation, verify JAVA:

[hadoop@hadoop-spark-vm hadoop]$ java -version
java version "1.7.0_55"
OpenJDK Runtime Environment (rhel- u55-b13)
OpenJDK 64-Bit Server VM (build 24.51-b03, mixed mode)

The folder /etc/alternatives contains a link to JAVA installation:

[hadoop@hadoop-spark-vm hadoop]$ ll /etc/alternatives/java
lrwxrwxrwx 1 root root 46 May 21 12:06 /etc/alternatives/java -> /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java

Add the path to JAVA_HOME environment variable by updating ~/.bashrc

export JAVA_HOME=/usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java

Now we are ready to install Hadoop

Download Hadoop’s binary package:

[hadoop@hadoop-spark-vm hadoop]$ cd ~
[hadoop@hadoop-spark-vm hadoop]$ wget http://apache.mirror.uber.com.au/hadoop/common/current/hadoop-2.4.0.tar.gz # download
[hadoop@hadoop-spark-vm hadoop]$ tar xzvf hadoop-2.4.0.tar.gz # un-tar the archive
[hadoop@hadoop-spark-vm hadoop]$ sudo mv hadoop-2.4.0 /usr/local/hadoop # move un-tar-ed directory to /usr/local/hadoop
[hadoop@hadoop-spark-vm hadoop]$ sudo chown -R hadoop:hadoop /usr/local/hadoop # change the ownership of /usr/local/hadoop to "hadoop" user and group

Next create namenode and datanode folders:

[hadoop@hadoop-spark-vm hadoop]$ mkdir -p ~/hadoopspace/hdfs/namenode
[hadoop@hadoop-spark-vm hadoop]$ mkdir -p ~/hadoopspace/hdfs/datanode

Configuring Hadoop

Add the following lines into ~/.bashrc to setup environment variable for Hadoop:

export HADOOP_INSTALL=/usr/local/hadoop

Update the environment variable immedicately:

[hadoop@hadoop-spark-vm hadoop]$ source ~/.bashrc

Go to “/usr/local/hadoop/etc/hadoop/” directory and update the XML files one by one:


[hadoop@hadoop-spark-vm hadoop]$ cp mapred-site.xml.template mapred-site.xml
[hadoop@hadoop-spark-vm hadoop]$ vi mapred-site.xml

And add the following between the configuration tabs:



Add the following between the configuration tabs:



Add the following between the confiugration tabs:







Add an entry for JAVA_HOME

export JAVA_HOME=/usr/lib/jvm/jre-1.7.0-openjdk.x86_64/

Next, run the following command one by one

hdfs namenode -format

You will see the following outputs:

[hadoop@hadoop-spark-vm hadoop]$ hdfs namenode -format
14/05/21 12:33:50 INFO namenode.NameNode: STARTUP_MSG:
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = hadoop-spark-vm/
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 2.4.0
STARTUP_MSG: classpath = /usr/local/hadoop/etc/hadoop:/usr/local/hadoop/share/hadoop/common/lib/zookeeper-3.4.5.jar:/usr/local/hadoop/share/hadoop/common/lib/stax-api-1.0-2.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-beanutils-1.7.0.jar:/usr/local/hadoop/share/hadoop/common/lib/jettison-1.1.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-httpclient-3.1.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-cli-1.2.jar:/usr/local/hadoop/share/hadoop/common/lib/servlet-api-2.5.jar:/usr/local/hadoop/share/hadoop/common/lib/jackson-jaxrs-1.8.8.jar:/usr/local/hadoop/share/hadoop/common/lib/java-xmlbuilder-0.4.jar:/usr/local/hadoop/share/hadoop/common/lib/jasper-runtime-5.5.23.jar:/usr/local/hadoop/share/hadoop/common/lib/jackson-mapper-asl-1.8.8.jar:/usr/local/hadoop/share/hadoop/common/lib/httpclient-4.2.5.jar:/usr/local/hadoop/share/hadoop/common/lib/mockito-all-1.8.5.jar:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-logging-1.1.3.jar:/usr/local/hadoop/share/hadoop/common/lib/jaxb-impl-2.2.3-1.jar:/usr/local/hadoop/share/hadoop/common/lib/paranamer-2.3.jar:/usr/local/hadoop/share/hadoop/common/lib/hadoop-auth-2.4.0.jar:/usr/local/hadoop/share/hadoop/common/lib/jetty-6.1.26.jar:/usr/local/hadoop/share/hadoop/common/lib/jaxb-api-2.2.2.jar:/usr/local/hadoop/share/hadoop/common/lib/jsr305-1.3.9.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-net-3.1.jar:/usr/local/hadoop/share/hadoop/common/lib/slf4j-api-1.7.5.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-codec-1.4.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-compress-1.4.1.jar:/usr/local/hadoop/share/hadoop/common/lib/avro-1.7.4.jar:/usr/local/hadoop/share/hadoop/common/lib/xmlenc-0.52.jar:/usr/local/hadoop/share/hadoop/common/lib/httpcore-4.2.5.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-configuration-1.6.jar:/usr/local/hadoop/share/hadoop/common/lib/jackson-core-asl-1.8.8.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-io-2.4.jar:/usr/local/hadoop/share/hadoop/common/lib/guava-11.0.2.jar:/usr/local/hadoop/share/hadoop/common/lib/jasper-compiler-5.5.23.jar:/usr/local/hadoop/share/hadoop/common/lib/activation-1.1.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-lang-2.6.jar:/usr/local/hadoop/share/hadoop/common/lib/jersey-json-1.9.jar:/usr/local/hadoop/share/hadoop/common/lib/jets3t-0.9.0.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-beanutils-core-1.8.0.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-collections-3.2.1.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-math3-3.1.1.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-el-1.0.jar:/usr/local/hadoop/share/hadoop/common/lib/asm-3.2.jar:/usr/local/hadoop/share/hadoop/common/lib/jersey-core-1.9.jar:/usr/local/hadoop/share/hadoop/common/lib/netty-3.6.2.Final.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-digester-1.8.jar:/usr/local/hadoop/share/hadoop/common/lib/jersey-server-1.9.jar:/usr/local/hadoop/share/hadoop/common/lib/snappy-java-*.jar
STARTUP_MSG: build = http://svn.apache.org/repos/asf/hadoop/common -r 1583262; compiled by 'jenkins' on 2014-03-31T08:29Z
STARTUP_MSG: java = 1.7.0_55
14/05/21 12:33:50 INFO namenode.NameNode: registered UNIX signal handlers for [TERM, HUP, INT]
14/05/21 12:33:50 INFO namenode.NameNode: createNameNode [-format]
14/05/21 12:33:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Formatting using clusterid: CID-fcb97223-8452-4534-bf0d-23a9c0ce42df
14/05/21 12:33:51 INFO namenode.FSNamesystem: fsLock is fair:true
14/05/21 12:33:51 INFO namenode.HostFileManager: read includes:
14/05/21 12:33:51 INFO namenode.HostFileManager: read excludes:
14/05/21 12:33:51 INFO blockmanagement.DatanodeManager: dfs.block.invalidate.limit=1000
14/05/21 12:33:51 INFO blockmanagement.DatanodeManager: dfs.namenode.datanode.registration.ip-hostname-check=true
14/05/21 12:33:51 INFO util.GSet: Computing capacity for map BlocksMap
14/05/21 12:33:51 INFO util.GSet: VM type = 64-bit
14/05/21 12:33:51 INFO util.GSet: 2.0% max memory 966.7 MB = 19.3 MB
14/05/21 12:33:51 INFO util.GSet: capacity = 2^21 = 2097152 entries
14/05/21 12:33:51 INFO blockmanagement.BlockManager: dfs.block.access.token.enable=false
14/05/21 12:33:51 INFO blockmanagement.BlockManager: defaultReplication = 1
14/05/21 12:33:51 INFO blockmanagement.BlockManager: maxReplication = 512
14/05/21 12:33:51 INFO blockmanagement.BlockManager: minReplication = 1
14/05/21 12:33:51 INFO blockmanagement.BlockManager: maxReplicationStreams = 2
14/05/21 12:33:51 INFO blockmanagement.BlockManager: shouldCheckForEnoughRacks = false
14/05/21 12:33:51 INFO blockmanagement.BlockManager: replicationRecheckInterval = 3000
14/05/21 12:33:51 INFO blockmanagement.BlockManager: encryptDataTransfer = false
14/05/21 12:33:51 INFO blockmanagement.BlockManager: maxNumBlocksToLog = 1000
14/05/21 12:33:51 INFO namenode.FSNamesystem: fsOwner = hadoop (auth:SIMPLE)
14/05/21 12:33:51 INFO namenode.FSNamesystem: supergroup = supergroup
14/05/21 12:33:51 INFO namenode.FSNamesystem: isPermissionEnabled = true
14/05/21 12:33:51 INFO namenode.FSNamesystem: HA Enabled: false
14/05/21 12:33:51 INFO namenode.FSNamesystem: Append Enabled: true
14/05/21 12:33:51 INFO util.GSet: Computing capacity for map INodeMap
14/05/21 12:33:51 INFO util.GSet: VM type = 64-bit
14/05/21 12:33:51 INFO util.GSet: 1.0% max memory 966.7 MB = 9.7 MB
14/05/21 12:33:51 INFO util.GSet: capacity = 2^20 = 1048576 entries
14/05/21 12:33:51 INFO namenode.NameNode: Caching file names occuring more than 10 times
14/05/21 12:33:51 INFO util.GSet: Computing capacity for map cachedBlocks
14/05/21 12:33:51 INFO util.GSet: VM type = 64-bit
14/05/21 12:33:51 INFO util.GSet: 0.25% max memory 966.7 MB = 2.4 MB
14/05/21 12:33:51 INFO util.GSet: capacity = 2^18 = 262144 entries
14/05/21 12:33:51 INFO namenode.FSNamesystem: dfs.namenode.safemode.threshold-pct = 0.9990000128746033
14/05/21 12:33:51 INFO namenode.FSNamesystem: dfs.namenode.safemode.min.datanodes = 0
14/05/21 12:33:51 INFO namenode.FSNamesystem: dfs.namenode.safemode.extension = 30000
14/05/21 12:33:51 INFO namenode.FSNamesystem: Retry cache on namenode is enabled
14/05/21 12:33:51 INFO namenode.FSNamesystem: Retry cache will use 0.03 of total heap and retry cache entry expiry time is 600000 millis
14/05/21 12:33:51 INFO util.GSet: Computing capacity for map NameNodeRetryCache
14/05/21 12:33:51 INFO util.GSet: VM type = 64-bit
14/05/21 12:33:51 INFO util.GSet: 0.029999999329447746% max memory 966.7 MB = 297.0 KB
14/05/21 12:33:51 INFO util.GSet: capacity = 2^15 = 32768 entries
14/05/21 12:33:51 INFO namenode.AclConfigFlag: ACLs enabled? false
Re-format filesystem in Storage Directory /home/hadoop/hadoopspace/hdfs/namenode ? (Y or N) Y
14/05/21 12:34:10 INFO namenode.FSImage: Allocated new BlockPoolId: BP-614723417-
14/05/21 12:34:10 INFO common.Storage: Storage directory /home/hadoop/hadoopspace/hdfs/namenode has been successfully formatted.
14/05/21 12:34:10 INFO namenode.NNStorageRetentionManager: Going to retain 1 images with txid >= 0
14/05/21 12:34:10 INFO util.ExitUtil: Exiting with status 0
14/05/21 12:34:10 INFO namenode.NameNode: SHUTDOWN_MSG:
SHUTDOWN_MSG: Shutting down NameNode at hadoop-spark-vm/

[hadoop@hadoop-spark-vm hadoop]$ start-dfs.sh
14/05/21 12:35:04 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Starting namenodes on [localhost]
localhost: starting namenode, logging to /usr/local/hadoop/logs/hadoop-hadoop-namenode-hadoop-spark-vm.out
localhost: starting datanode, logging to /usr/local/hadoop/logs/hadoop-hadoop-datanode-hadoop-spark-vm.out
Starting secondary namenodes []
The authenticity of host ' (' can't be established.
RSA key fingerprint is 6b:bc:6d:99:e5:88:52:29:07:cc:e3:bb:b9:dd:af:0b.
Are you sure you want to continue connecting (yes/no)? yes Warning: Permanently added '' (RSA) to the list of known hosts. starting secondarynamenode, logging to /usr/local/hadoop/logs/hadoop-hadoop-secondarynamenode-hadoop-spark-vm.out
14/05/21 12:35:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[hadoop@hadoop-spark-vm hadoop]$ start-yarn.sh
starting yarn daemons
starting resourcemanager, logging to /usr/local/hadoop/logs/yarn-hadoop-resourcemanager-hadoop-spark-vm.out
localhost: starting nodemanager, logging to /usr/local/hadoop/logs/yarn-hadoop-nodemanager-hadoop-spark-vm.out

Issue the jps command and verify that the following jobs are running:

[hadoop@hadoop-spark-vm hadoop]$ jps
2795 NodeManager
2326 NameNode
2576 SecondaryNameNode
2434 DataNode
2710 ResourceManager
3091 Jps

Now you can try to create some hadoop directories by running

[hadoop@hadoop-spark-vm hadoop]$ hadoop fs -mkdir /test

And verify it is created by running:

[hadoop@hadoop-spark-vm hadoop]$ hadoop fs -ls /
drwxr-xr-x - hadoop supergroup 0 2014-05-21 14:21 /test

This tutorial was followed on AJ’S DATA STORAGE TUTORIALS, and I modified it to suit my own environment.

Hive Export/Import Command – Transfering Data Between Hive Instances

When working with Hive, there are lots of scenarios that we need to move data from one cluster to another. For instance, sometimes we need to copy some production data into alpha/dev environment for testing purpose. Luckily that Hive provides two easy commands for us to do it.

Since version 0.8, Hive supports EXPORT and IMPORT features that allows you to export the metadata as well as the data for the corresponding table to a directory in HDFS, which can then be imported back to another database or Hive instance.

The command looks like this:

EXPORT TABLE <table_name> TO 'path/to/hdfs';

Please note that you don’t need the “/” in front, otherwise Hive will complain about invalid path or target is not an empty directory. And the final data will be written to /user/<user_name>/path/to/hdfs directory in HDFS (of course, it will need to be writable by the current user).

The next step is to copy the data across to another Hive instance, you can use the “distcp” command from Hadoop:

hadoop distcp hdfs://:8020/path/to/hdfs hdfs:///path/to/hdfs

Once the copy is finished, you can then use the IMPORT command on the new cluster to load the data into a new table:

IMPORT TABLE <table_name> FROM 'path/to/another/hdfs';

Now you will have the exact same data in the new cluster as the old one. The transfer should be smooth, the only thing is that if the data is big, it might take a while for “distcp” to copy the data across.

For more information, you can have a look at the Apache Hive’s Confluence Page

Hive User Defined Aggregation Function (UDAF)

Sometimes the Hive query you want to write could not be expressed easily using the Hive built-in functions. By writing a user-defined function (UDF), Hive makes it easy to plug in your own processing code and invoke it from a Hive query.

There are three types of user-defined functions you can have in Hive:

  1. UDF – normal user-defined functions, simplest to write
  2. UDAF – user-defined aggregation functions, the ones typically used in the “group by” case
  3. UDTF – user-defined table functions, just like Hive internal “explode” function

This post will give you an example of how to write UDAF functions. Here is the code:

package com.effectivemeasure.hive.udaf;

import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

import java.util.HashMap;

 * This class will group key value pairs and return a Map<Integer, Integer>, based on group by fields
 * For example, query "SELECT id, GROUP_MAP(choice_id, question_id) FROM table GROUP BY visitor_id"
 * Will be able to return the following values:
 * 2f5d017334da977740ee723-61885258 -> {3165:411,2:1,3162:410,3159:409}
 * @author Eric Lin
public final class GroupMap extends UDAF
    public static class Evaluator implements UDAFEvaluator
        private HashMap<Integer, Integer> buffer;

        public Evaluator()

         * Initializes the evaluator and resets its internal state.
        public void init()
            buffer = new HashMap<Integer, Integer>();

         * This function is called every time there is a new value to be aggregated.
         * The parameters are the same parameters that are passed when function is called in Hive query.
         * @param key Integer
         * @param value Integer
         * @return Boolean
        public boolean iterate(Integer key, Integer value)
            if(!buffer.containsKey(key)) {
                buffer.put(key, value);

            return true;

         * Function called when separated jobs are done on different data nodes (partial aggregation)
         * @return HashMap
        public HashMap<Integer, Integer> terminatePartial()
            return buffer;

         * Function called when merging all data result calculated from all data notes
         * @param another HashMap
         * @return Boolean
        public boolean merge(HashMap<Integer, Integer> another)
            //null might be passed in case there is no input data.
            if (another == null) {
                return true;

            for(Integer key : another.keySet()) {
                if(!buffer.containsKey(key)) {
                    buffer.put(key, another.get(key));

            return true;

         * This function is called when the final result of the aggregation is needed
         * @return HashMap
        public HashMap<Integer, Integer> terminate()
            if (buffer.size() == 0) {
                return null;

            return buffer;

Key points:

  • A UDAF must be a subclass of org.apache.hadoop.hive.ql.exec.UDAF
  • Contain one or more nested static classes implementing org.apache.hadoop.hive.ql.exec.UDAFEvaluator
  • I have explained the 5 required functions “init”, “iterate”, “terminatePartial”, “merge”, “terminate” in the comments section.

To run it:

  1. Compile the JAVA code to generate the JAR file
  2. Put the JAR file on the namenode under any location, maybe /tmp
  3. Hive command “ADD JAR /tmp/my-udaf.jar”
  4. Hive command “CREATE TEMPORARY FUNCTION group_map AS ‘com.effectivemeasure.hive.udaf'”
  5. Finally simply use the function as normal in Hive query: “SELECT id, GROUP_MAP(choice_id, question_id) FROM table GROUP BY id”

That’s it, now you have a custom UDAF running on your cluster.