How to Index LZO files in Hadoop

Today I was trying to index LZO file using hadoop command:

hadoop jar /opt/cloudera/parcels/GPLEXTRAS-5.7.0-1.cdh5.7.0.p0.40/lib/hadoop/lib/hadoop-lzo.jar com.hadoop.compression.lzo.DistributedLzoIndexer /tmp/lzo_test

However, it failed with the following error:

16/09/10 03:05:51 INFO mapreduce.Job: Task Id : attempt_1473404927068_0005_m_000000_0, Status : FAILED
Error: java.lang.NullPointerException
       	at com.hadoop.mapreduce.LzoSplitRecordReader.initialize(LzoSplitRecordReader.java:50)
       	at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:548)
       	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:786)
       	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
       	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
       	at java.security.AccessController.doPrivileged(Native Method)
       	at javax.security.auth.Subject.doAs(Subject.java:415)
       	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
       	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

After some research, it turned out that I did not have LZO codec enabled in my cluster. I did the following to resolve the issue:

– Add gplextras parcel to Cloudera Manager’s parcel configuration:
https://archive.cloudera.com/gplextras5/parcels/x.x.x

where x.x.x should match with your CDH version, in my case is 5.7.0

screen-shot-2016-09-10-at-8-25-24-pm

– Install GPL Extras Parcel through Cloudera Manager as normal
– Add the following codec to Compression Codec (io.compression.codecs) in Cloudera Manager > HDFS > Configurations:

com.hadoop.compression.lzo.LzopCodec
com.hadoop.compression.lzo.LzoCodec

screen-shot-2016-09-10-at-8-17-45-pm

– Restart Cluster
– Deploy Client Cluster Configuration
– Install native-lzo library on all hosts in the cluster

sudo yum install lzop

After the above changes, the index job should finish without issues:

hadoop jar /opt/cloudera/parcels/GPLEXTRAS-5.7.0-1.cdh5.7.0.p0.40/lib/hadoop/lib/hadoop-lzo.jar com.hadoop.compression.lzo.DistributedLzoIndexer /tmp/lzo_test
16/09/10 03:22:10 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
16/09/10 03:22:10 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 820f83d05b8d916e89dbb72d6ef129113b277303]
16/09/10 03:22:12 INFO lzo.DistributedLzoIndexer: Adding LZO file hdfs://host-10-17-101-195.coe.cloudera.com:8020/tmp/lzo_test/test.txt.lzo to indexing list (no index currently exists)
16/09/10 03:22:12 INFO Configuration.deprecation: mapred.map.tasks.speculative.execution is deprecated. Instead, use mapreduce.map.speculative
16/09/10 03:22:12 INFO client.RMProxy: Connecting to ResourceManager at host-10-17-101-195.coe.cloudera.com/10.17.101.195:8032
16/09/10 03:22:12 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 119 for hive on 10.17.101.195:8020
16/09/10 03:22:12 INFO security.TokenCache: Got dt for hdfs://host-10-17-101-195.coe.cloudera.com:8020; Kind: HDFS_DELEGATION_TOKEN, Service: 10.17.101.195:8020, Ident: (HDFS_DELEGATION_TOKEN token 119 for hive)
16/09/10 03:22:12 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/09/10 03:22:12 INFO input.FileInputFormat: Total input paths to process : 1
16/09/10 03:22:13 INFO mapreduce.JobSubmitter: number of splits:1
16/09/10 03:22:13 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1473502403576_0002
16/09/10 03:22:13 INFO mapreduce.JobSubmitter: Kind: HDFS_DELEGATION_TOKEN, Service: 10.17.101.195:8020, Ident: (HDFS_DELEGATION_TOKEN token 119 for hive)
16/09/10 03:22:13 INFO impl.YarnClientImpl: Submitted application application_1473502403576_0002
16/09/10 03:22:13 INFO mapreduce.Job: The url to track the job: http://host-10-17-101-195.coe.cloudera.com:8088/proxy/application_1473502403576_0002/
16/09/10 03:22:13 INFO mapreduce.Job: Running job: job_1473502403576_0002
16/09/10 03:22:22 INFO mapreduce.Job: Job job_1473502403576_0002 running in uber mode : false
16/09/10 03:22:22 INFO mapreduce.Job:  map 0% reduce 0%
16/09/10 03:22:35 INFO mapreduce.Job:  map 22% reduce 0%
16/09/10 03:22:38 INFO mapreduce.Job:  map 36% reduce 0%
16/09/10 03:22:41 INFO mapreduce.Job:  map 50% reduce 0%
16/09/10 03:22:44 INFO mapreduce.Job:  map 62% reduce 0%
16/09/10 03:22:47 INFO mapreduce.Job:  map 76% reduce 0%
16/09/10 03:22:50 INFO mapreduce.Job:  map 90% reduce 0%
16/09/10 03:22:53 INFO mapreduce.Job:  map 100% reduce 0%
16/09/10 03:22:53 INFO mapreduce.Job: Job job_1473502403576_0002 completed successfully
16/09/10 03:22:53 INFO mapreduce.Job: Counters: 30
       	File System Counters
       		FILE: Number of bytes read=0
       		FILE: Number of bytes written=118080
       		FILE: Number of read operations=0
       		FILE: Number of large read operations=0
       		FILE: Number of write operations=0
       		HDFS: Number of bytes read=85196
       		HDFS: Number of bytes written=85008
       		HDFS: Number of read operations=2
       		HDFS: Number of large read operations=0
       		HDFS: Number of write operations=4
       	Job Counters
       		Launched map tasks=1
       		Data-local map tasks=1
       		Total time spent by all maps in occupied slots (ms)=28846
       		Total time spent by all reduces in occupied slots (ms)=0
       		Total time spent by all map tasks (ms)=28846
       		Total vcore-seconds taken by all map tasks=28846
       		Total megabyte-seconds taken by all map tasks=29538304
       	Map-Reduce Framework
       		Map input records=10626
       		Map output records=10626
       		Input split bytes=138
       		Spilled Records=0
       		Failed Shuffles=0
       		Merged Map outputs=0
       		GC time elapsed (ms)=32
       		CPU time spent (ms)=3960
       		Physical memory (bytes) snapshot=233476096
       		Virtual memory (bytes) snapshot=1564155904
       		Total committed heap usage (bytes)=176160768
       	File Input Format Counters
       		Bytes Read=85058
       	File Output Format Counters
       		Bytes Written=0

Securely Managing Passwords In Sqoop

Apache Sqoop became the Top-Level Project in Apache in March 2012. Since then, Sqoop has developed a lot and become very popular amongst Hadoop ecosystem. In this post, I will cover the ways to specify database passwords to Sqoop in a secure way.

The following ways are common to pass database passwords to Sqoop:

sqoop import --connect jdbc:mysql://myexample.com/test \
             --username myuser -P \
             --table mytable
sqoop import --connect jdbc:mysql://myexample.com/test 
             --username myuser \
             --password mypassword \
             --table mytable

The first one is secure as other people can’t see the password, however, it is only practical to use in the command line.

And we all agree that the second one is insecure as everyone can see what the password is to access the database.

The more secure way of passing the password is through the use of so called password file. The command as follows:

echo -n "password" > /home/ericlin/.mysql.password
chmod 400 /home/ericlin/.mysql.password
sqoop import --connect jdbc:mysql://myexample.com/test \
             --username myuser \
             --password-file /home/ericlin/.mysql.password \
             --table mytable

Please note that we need “-n” option for the “echo” command so that no newline will be added to the end of the password. And, please do not use “vim” to create the file as “vim” will add newline automatically to the end of the file, which will cause Sqoop to fail as the password contains a newline character.

However, storing password in a text file is still considered not secure even though we have set the permissions. As of Sqoop 1.4.5, Sqoop supports the use of JAVA Key Store to store passwords, so that you do not need to store passwords in clear text in a file.

To generate the key:

[ericlin@localhost ~] $ hadoop credential create mydb.password.alias -provider jceks://hdfs/user/ericlin/mysql.password.jceks
Enter password: 
Enter password again: 
mysql.password has been successfully created.
org.apache.hadoop.security.alias.JavaKeyStoreProvider has been updated.

On prompt, enter the password that will be used to access the database.

The “mydb.password.alias” is the alias that we can use to pass to Sqoop when running the command, so that no password is needed.

Then you can run the following Sqoop command:

sqoop import -Dhadoop.security.credential.provider.path=jceks://hdfs/user/ericlin/mysql.password.jceks \
             -–connect ‘jdbc:mysql://myexample.com/test’ \
             -–table mytable \
             -–username myuser \
             -–password-alias mydb.password.alias

This way password is hidden inside jceks://hdfs/user/ericlin/mysql.password.jceks and no one is able to see it.

Hope this helps.

    Hive “INSERT OVERWRITE” Does Not Remove Existing Data

    When Hive tries to “INSERT OVERWRITE” to a partition of an external table under existing directory, depending on whether the partition definition already exists in the metastore or not, Hive will behave differently:

    1) if partition definition does not exist, it will not try to guess where the target partition directories are (either static or dynamic partitions), so it will not be able to delete existing files under those partitions that will be written to

    2) if partition definition does exist, it will attempt to remove all files under the target partition directory before writing new data into those directories

    To re-produce the issue, I did the following:

    Login as “hdfs” user, run the following commands

    hdfs dfs -mkdir test
    hdfs dfs -mkdir test/p=p1
    touch test.txt
    hdfs dfs -put test.txt test/p=p1
    

    Confirm that there is one file under test/p=p1

    hdfs dfs -ls test/p=p1
    Found 1 items
    -rw-r--r--   3 hdfs supergroup          5 2015-05-04 17:30 test/p=p1/test.txt
    

    Then start “hive”

    DROP TABLE IF EXISTS partition_test;
    CREATE EXTERNAL TABLE partition_test (a int) PARTITIONED BY (p string) LOCATION '/user/hdfs/test';
    INSERT OVERWRITE TABLE partition_test PARTITION (p = 'p1') SELECT <int_column> FROM <existing_table>;
    

    The output from the above “INSERT OVERWRITE”:

    Total jobs = 3
    Launching Job 1 out of 3
    Number of reduce tasks is set to 0 since there's no reduce operator
    Starting Job = job_1430100146027_0004, Tracking URL = http://host-10-17-74-166.coe.cloudera.com:8088/proxy/application_1430100146027_0004/
    Kill Command = /opt/cloudera/parcels/CDH-5.3.3-1.cdh5.3.3.p0.5/lib/hadoop/bin/hadoop job  -kill job_1430100146027_0004
    Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
    2015-05-05 00:15:35,220 Stage-1 map = 0%,  reduce = 0%
    2015-05-05 00:15:48,740 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 3.19 sec
    MapReduce Total cumulative CPU time: 3 seconds 190 msec
    Ended Job = job_1430100146027_0004
    Stage-4 is selected by condition resolver.
    Stage-3 is filtered out by condition resolver.
    Stage-5 is filtered out by condition resolver.
    Moving data to: hdfs://ha-test/user/hdfs/test/p=p1/.hive-staging_hive_2015-05-05_00-13-47_253_4887262776207257351-1/-ext-10000
    Loading data to table default.partition_test partition (p=p1)
    Partition default.partition_test{p=p1} stats: [numFiles=2, numRows=33178, totalSize=194973, rawDataSize=161787]
    MapReduce Jobs Launched: 
    Stage-Stage-1: Map: 1   Cumulative CPU: 3.19 sec   HDFS Read: 2219273 HDFS Write: 195055 SUCCESS
    Total MapReduce CPU Time Spent: 3 seconds 190 msec
    

    to confirm that test.txt is not removed

    hdfs dfs -ls test/p=p1
    Found 2 items
    -rwxr-xr-x   3 hdfs supergroup     194965 2015-05-05 00:15 test/p=p1/000000_0
    -rw-r--r--   3 hdfs supergroup          8 2015-05-05 00:10 test/p=p1/test.txt
    

    rename 000000_0 to 11111111

    hdfs dfs -mv test/p=p1/000000_0 test/p=p1/11111111
    

    confirm now two files under test/p=p1

    hdfs dfs -ls test/p=p1
    Found 2 items
    -rwxr-xr-x   3 hdfs supergroup     194965 2015-05-05 00:15 test/p=p1/11111111
    -rw-r--r--   3 hdfs supergroup          8 2015-05-05 00:10 test/p=p1/test.txt
    

    Runt the following query again:

    INSERT OVERWRITE TABLE partition_test PARTITION (p = 'p1') SELECT <int_column> FROM <existing_table>;
    

    The output from second “INSERT OVERWRITE”:

    Total jobs = 3
    Launching Job 1 out of 3
    Number of reduce tasks is set to 0 since there's no reduce operator
    Starting Job = job_1430100146027_0005, Tracking URL = http://host-10-17-74-166.coe.cloudera.com:8088/proxy/application_1430100146027_0005/
    Kill Command = /opt/cloudera/parcels/CDH-5.3.3-1.cdh5.3.3.p0.5/lib/hadoop/bin/hadoop job  -kill job_1430100146027_0005
    Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
    2015-05-05 00:23:39,298 Stage-1 map = 0%,  reduce = 0%
    2015-05-05 00:23:48,891 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.92 sec
    MapReduce Total cumulative CPU time: 2 seconds 920 msec
    Ended Job = job_1430100146027_0005
    Stage-4 is selected by condition resolver.
    Stage-3 is filtered out by condition resolver.
    Stage-5 is filtered out by condition resolver.
    Moving data to: hdfs://ha-test/user/hdfs/test/p=p1/.hive-staging_hive_2015-05-05_00-21-58_505_3688057093497278728-1/-ext-10000
    Loading data to table default.partition_test partition (p=p1)
    Moved: 'hdfs://ha-test/user/hdfs/test/p=p1/11111111' to trash at: hdfs://ha-test/user/hdfs/.Trash/Current
    Moved: 'hdfs://ha-test/user/hdfs/test/p=p1/test.txt' to trash at: hdfs://ha-test/user/hdfs/.Trash/Current
    Partition default.partition_test{p=p1} stats: [numFiles=1, numRows=33178, totalSize=194965, rawDataSize=161787]
    MapReduce Jobs Launched: 
    Stage-Stage-1: Map: 1   Cumulative CPU: 2.92 sec   HDFS Read: 2219273 HDFS Write: 195055 SUCCESS
    Total MapReduce CPU Time Spent: 2 seconds 920 msec
    

    Finally confirm that only one file under test/p=p1 directory, both 11111111 and test.txt were moved to .Trash directory

    hdfs dfs -ls test/p=p1
    Found 1 items
    -rwxr-xr-x   3 hdfs supergroup       4954 2015-05-04 17:36 test/p=p1/000000_0
    

    The above test confirms that files remain in the target partition directory when table was newly created with no partition definitions.

    To fix this issue, you can run the following hive query before the “INSERT OVERWRITE” to recover the missing partition definitions:

    MSCK REPAIR TABLE partition_test;
    
    OK
    Partitions not in metastore:	partition_test:p=p1
    Repair: Added partition to metastore partition_test:p=p1
    Time taken: 0.486 seconds, Fetched: 2 row(s)
    

      Hadoop Administrator – Cloudera Certified

      I had some drama yesterday when I was trying to setup my laptop for Cloudera’s Hadoop Administrator Exam with Innovative Exams (http://examslocal.com/). I was unable to share my screen inside Chrome with the examiner. I spent about 20 minutes trying different ways and finally it worked, but I had no idea what the problem was and how I fixed, it just happened.

      Anyway, the exam went smoothly afterwards and it took 90 minutes. I had taken the practical exams lots of times before this and I have to say that there are lots more trickier questions in the actual exam than the practical one. And some of the questions, although not many, that you can not find in the course note, meaning you will have to do lots of practical work with Hadoop by hand to know the answer.

      I got the score back straightaway after the exam, and guess what, the result was PASS. I will get the actual certification confirmation in about 2 business days. Oh Yeah!

      logo_cloudera_certified

      Now next one is the Developer certification. Cloudera is in the process of changing the exam structure for the Developer course, and there will be some practical questions, meaning you will have to actually write some code, rather than pure single/multiple choice questions.

      Wish us luck!!

      Enabling Snappy Support for Shark/Spark

      Shark/Spark will not be able to read Snappy compressed data out of the box. In my previous post, I explained how to enable Snappy compression in Hadoop 2.4, once this is done, enabling Snappy in Spark is dead simple, all you need to do is to set an envionment variable for it:

      In $SPARK_HOME/conf/spark-env.sh, add the following:

      export SPARK_LIBRARY_PATH=$HADOOP_HOME/lib/native
      

      Of course, this assumes that you have hadoop’s native Snappy libray libsnappy.so in the specified directory.

      After distributing this conf file to all the slaves and restarting the cluster, Shark will be able to create or read Snappy compressed data through Spark.