Oozie SSH Action Does Not Support Chained Commands – OOZIE-1974

I have seen quite a few CDH users who try to run chained Linux command via Oozie’s SSH Action. Example is like below:

<action name="sshTest">
  <ssh xmlns="uri:oozie:ssh-action:0.1">
    <host>${sshUserHost}</host>
    <command>kinit test.keytab test@TEST.COM ; python ....</command>
    <capture-output/>
  </ssh>
  <ok to="nextActino"/>
  <error to="kill"/>
</action>

We can see that the command to run on remote host is as below:

kinit test.keytab test@TEST.COM ; python ....

This is OK if both commands can finish successfully very quickly. However it will cause the SSH action to fail if the python command needs to run for a certain time, say more than 5-10 minutes. Below is the example log messages produced in Oozie’s server log while the SSH action is running:

2018-08-13 10:01:48,215 WARN org.apache.oozie.command.wf.CompletedActionXCommand: SERVER[{oozie-host}] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0000234-172707121423674-oozie-oozi-W] ACTION[0000234-172707121423674-oozie-oozi-W@sshTest] Received early callback for action still in PREP state; will wait [10,000]ms and requeue up to [5] more times
2018-08-13 10:01:48,216 WARN org.apache.oozie.command.wf.CompletedActionXCommand: SERVER[{oozie-host}] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0000234-172707121423674-oozie-oozi-W] ACTION[0000234-172707121423674-oozie-oozi-W@sshTest] Received early callback for action still in PREP state; will wait [10,000]ms and requeue up to [5] more times

....

2018-08-13 10:02:38,243 ERROR org.apache.oozie.command.wf.CompletedActionXCommand: SERVER[cdlpf1hdpm1004.es.ad.adp.com] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0000234-172707121423674-oozie-oozi-W] ACTION[0000234-172707121423674-oozie-oozi-W@sshTest] XException, 
org.apache.oozie.command.CommandException: E0822: Received early callback for action [0000234-172707121423674-oozie-oozi-W@sshTest] while still in PREP state and exhausted all requeues
 at org.apache.oozie.command.wf.CompletedActionXCommand.execute(CompletedActionXCommand.java:114)
 at org.apache.oozie.command.wf.CompletedActionXCommand.execute(CompletedActionXCommand.java:39)
 at org.apache.oozie.command.XCommand.call(XCommand.java:286)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at org.apache.oozie.service.CallableQueueService$CallableWrapper.run(CallableQueueService.java:179)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

The reason for the failure is because Oozie currently does not support chained Linux commands on SSH Action, which is tracked via upstream JIRA OOZIE-1974.

Below is what happened behind the scene:

1. ssh-base.sh and ssh-wrapper.sh files will be copied to target host
https://github.com/apache/oozie/blob/master/core/src/main/resources

2. Oozie will run below command from Oozie server via ssh directly to the target host:

sh ssh-base.sh FLATTEN_ARGS curl "http://{oozie-host}:11000/oozie/callback?id=0000234-172707121423674-oozie-oozi-W@sshTest&status=#status" \
"--data-binary%%%@#stdout%%%--request%%%POST%%%--header%%%\"content-type:text/plain\"" \
0000234-172707121423674-oozie-oozi-W@sshTest@3 kinit test.keytab test@TEST.COM ; python ....

3. based on the command from above, we can see that the command was rebuilt, now the full command will be broken into two commands:

sh ssh-base.sh FLATTEN_ARGS curl "http://{oozie-host}:11000/oozie/callback?id=0000234-172707121423674-oozie-oozi-W@sshTest&status=#status" \
"--data-binary%%%@#stdout%%%--request%%%POST%%%--header%%%\"content-type:text/plain\"" \
0000234-172707121423674-oozie-oozi-W@sshTest@3 kinit test.keytab test@TEST.COM

and

python ....

Not the original “kinit test.keytab test@TEST.COM” and “python ….”

4. ssh-base.sh script will in term run below command:

sh ssh-wrapper.sh FLATTEN_ARGS curl "http://{oozie-host}:11000/oozie/callback?id=0000234-172707121423674-oozie-oozi-W@sshTest&status=#status" \
"--data-binary%%%@#stdout%%%--request%%%POST%%%--header%%%\"content-type:text/plain\"" \
0000234-172707121423674-oozie-oozi-W@sshTest@3 kinit test.keytab test@TEST.COM

This command will finish very quickly and triggered callback curl call immediately, however, the “python” command will cause the SSH job to not finish until it finishes. This is causing the Oozie job in the pending state and causing the callback to fail after timeout because the Oozie job and SSH job states are not consistent.

So until OOZIE-1974 is fixed, the solution is to put both the commands inside a single script file and make it available to run on the remote host.

Hope above helps.

WebHCat Request Failed With Error “id: HTTP: no such user”

WebHCat, previously known as Templeton, is the REST API for HCatalog, a table and storage management layer for Hadoop. Users can use WebHCat to access metadata information from HCatalog, as well as submitting jobs for MapReduce, Hive & Pig.

Below is an example of how to retrieve a list of databases via WebHCat API:

curl --negotiate -u: http://{webhcat-hostname}:50111/templeton/v1/ddl/database/

Please note that port 50111 is the default port number for WebHCat. And sample output looks like below:

{"databases":["default","mytest","s3_test","udf_db"]}

However, recently I was facing an issue that WebHCat request failed with below error:

2018-08-08 17:18:44,413 WARN org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:hive (auth:PROXY) via HTTP/{webhcat-hostname}@CDH511.COM (auth:KERBEROS) cause:org.apache.thrift.transport.TTrans
portException: java.net.SocketException: Connection reset
2018-08-08 17:18:44,414 ERROR org.apache.hive.hcatalog.templeton.CatchallExceptionMapper: java.lang.reflect.UndeclaredThrowableException
java.io.IOException: java.lang.reflect.UndeclaredThrowableException
...
Caused by: org.apache.thrift.transport.TTransportException: java.net.SocketException: Connection reset
....
        at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getDelegationToken(HiveMetaStoreClient.java:1882)
        at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getDelegationToken(HiveMetaStoreClient.java:1872)
....

From the stacktrace, we can see that WebHCat failed when trying to collect delegation token from HiveMetaStore. So checking the HMS server log, I found below error:

2018-08-08 17:18:19,675 WARN  org.apache.hadoop.security.ShellBasedUnixGroupsMapping: [pool-7-thread-2]: unable to return groups for user HTTP
PartialGroupNameException The user name 'HTTP' is not found. id: HTTP: no such user
id: HTTP: no such user

It is pretty clear that HMS failed due to user “HTTP” is missing. Adding “HTTP” user on the HMS server host resolved the issue.

Research further, I realized that this was due to in Hive configuration, hadoop.proxyuser.hive.groups was set to a list of groups, rather than “*”, and “HTTP” was one in the group list. You will not get such error if the hadoop.proxyuser.hive.groups is set at “*”, and only failed if “HTTP” was added manually (it is required to be on this list if the value is not “*”, because “hive” user need to be able to impersonate as “HTTP” user for the request to work).

The reason for such failure is because when hadoop.proxyuser.hive.groups is set as “*”, Hive will not bother to check for user’s existence, since every user is allowed. However, when a list of users are defined here, when Hive impersonates as those users, it will try to make sure that those users exist on the host that Hive runs. In our case, “HTTP” user did not exist on HMS host, HMS failed with the error we saw earlier. So we just need to add this user to resolve the issue.

Hope above helps for anyone who also have the same issue.

My Patch for SQOOP-3330 Committed

In the last few weeks, I have been working on an issue in Sqoop that Sqoop’s “–append” options does not work well with parameter “-Dmapreduce.output.basename”. The goal of adding “-Dmapreduce.output.basename” is to ask Sqoop to generate custom file names for the target file in HDFS, rather than using the default “part” prefix for all files.

However, Sqoop has a bug that when trying to move the file from temp folder to target folder in HDFS, it does not respect the setting for mapreduce.output.basename, hence resulted no data being moved to the final destination.

This can be confirmed by turning on the DEBUG logging for Sqoop (using –verbose):

sqoop import -Dmapreduce.output.basename="eric-test" --connect jdbc:mysql://mysql-host.com/test --username root --password 'root' --table test --target-dir /tmp/ericlin-test/sqoop/test --fields-terminated-by '\t' --verbose --append

18/05/28 22:24:44 INFO util.AppendUtils: Appending to directory test
18/05/28 22:24:44 DEBUG util.AppendUtils: Filename: _SUCCESS ignored
18/05/28 22:24:44 DEBUG util.AppendUtils: Filename: eric-test-m-00000 ignored
18/05/28 22:24:44 DEBUG util.AppendUtils: Filename: eric-test-m-00001 ignored
18/05/28 22:24:44 DEBUG util.AppendUtils: Filename: eric-test-m-00002 ignored
18/05/28 22:24:44 DEBUG util.AppendUtils: Deleting temporary folder 14935e396acc4ea7b9a6236c66064c9b_test

From the output, you can see that Sqoop ignored all the files generated with prefix “eric-test”.

I have submitted a upstream JIRA SQOOP-3330, and after a few review cycles, my patch finally get accepted and committed to Sqoop’s trunk code base. I am looking forward for the fix to be backported into CDH in the near future.

My Patch for SQOOP-3042 Committed

I have got a lot complains from Cloudera customers that after Sqoop job finishes, the table class Jar files were not cleaned up. By default, they are saved under /tmp/sqoop-{username}/compile, to be used by current running jobs. They are not needed anymore after job finishes, so they should be cleaned up.

The content of the directory looks like below:

[root@localhost ~]# ll /tmp/sqoop-hadoop/compile/
total 16
drwxrwxr-x. 2 hadoop hadoop 4096 Jun  6 08:56 1496d8f8400052af2a7d3ede2cfe496d
drwxrwxr-x. 2 hadoop hadoop 4096 Jun  6 08:45 6360b964ea0c1fdf6bf6aaed7a35b986
drwxrwxr-x. 2 hadoop hadoop 4096 Jun  6 08:45 d4ccb83934494ba2874b5c6d1b51d2ac
drwxrwxr-x. 2 hadoop hadoop 4096 Jun  6 08:50 df37a566defbfac477f6f309cf227dec

[root@localhost ~]# ll /tmp/sqoop-hadoop/compile/1496d8f8400052af2a7d3ede2cfe496d
total 56
-rw-rw-r--. 1 hadoop hadoop   620 Jun  6 08:56 SQOOP_3042$1.class
-rw-rw-r--. 1 hadoop hadoop   617 Jun  6 08:56 SQOOP_3042$2.class
-rw-rw-r--. 1 hadoop hadoop   620 Jun  6 08:56 SQOOP_3042$3.class
-rw-rw-r--. 1 hadoop hadoop   516 Jun  6 08:56 SQOOP_3042.avsc
-rw-rw-r--. 1 hadoop hadoop 10389 Jun  6 08:56 SQOOP_3042.class
-rw-rw-r--. 1 hadoop hadoop   237 Jun  6 08:56 SQOOP_3042$FieldSetterCommand.class
-rw-rw-r--. 1 hadoop hadoop  6063 Jun  6 08:56 SQOOP_3042.jar
-rw-rw-r--. 1 hadoop hadoop 12847 Jun  6 08:56 SQOOP_3042.java

I created an upstream JIRA to track and fix it SQOOP-3042 in Nov 2016. I have provided the patch since then, but never got looked at due to lack of reviewers.

After getting help from Cloudera Sqoop Engineers in our Budapest team, I finally get the JIRA progressed in the last few weeks and it was committed to Sqoop trunk yesterday. Details can be seen here: https://github.com/apache/sqoop/commit/0cfbf56713f7574568ea3754f6854e82f5540954

The fix involves adding a new command line options “–delete-compile-dir” so that user can instruct Sqoop to remove those temp directories after job finishes. The reason to add such option is to avoid changing Sqoop’s behaviour, but at the same time, allow Sqoop to perform exact actions.

An example command would look like below:

sqoop import --connect jdbc:mysql://localhost/test --username root --password pass --table SQOOP_3042 --target-dir /tmp/erictest --delete-target-dir --verbose --delete-compile-dir

And you can see below message showing in the –verbose mode to verify that directory and files are removed:

....
18/06/06 17:39:27 INFO mapreduce.ImportJobBase: Transferred 52 bytes in 29.6139 seconds (1.7559 bytes/sec)
18/06/06 17:39:27 INFO mapreduce.ImportJobBase: Retrieved 4 records.
18/06/06 17:39:27 DEBUG util.ClassLoaderStack: Restoring classloader: sun.misc.Launcher$AppClassLoader@6f1fba17
18/06/06 17:39:28 DEBUG util.DirCleanupHook: Removing directory: /tmp/sqoop-hadoop/compile/a9d8a87bc02a5f823a82014c49516736 in the clean up hook.

Spark Job SASL Authentication Error

Spark has an internal mechanism that authenticates executors with the driver controlling a given application. This can be controlled by setting “spark.authenticate” to “true”, as part of spark-submit’s parameters, like below:

spark-submit --master yarn-cluster --conf spark.authenticate=true --conf spark.dynamicAllocation.enabled=true ....

This setting is required if you have “spark.authenticate.enableSaslEncryption” enabled, or called “Enable Network Encryption” in Cloudera Manager.

So if you have “Enable Network Encryption” enabled, but do not pass “–conf spark.authenticate=true” for Spark job, the job will fail with below error:

6241 [dispatcher-event-loop-3] ERROR org.apache.spark.storage.BlockManager - 
Failed to connect to external shuffle server, will retry 1 more times after waiting 5 seconds...
java.lang.RuntimeException: java.lang.IllegalStateException: Expected SaslMessage, received something else (maybe your client does not have SASL enabled?)
	at org.apache.spark.network.sasl.SaslMessage.decode(SaslMessage.java:69)
	at org.apache.spark.network.sasl.SaslRpcHandler.receive(SaslRpcHandler.java:87)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:154)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:102)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
	at java.lang.Thread.run(Thread.java:745)
	at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:207)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	at java.lang.Thread.run(Thread.java:745)

To avoid passing “–conf spark.authenticate=true” all the time when you submit spark jobs, you can consider setting it as default for Spark. If you are using Cloudera Manager, navigate to CM > Spark > Configuration > “Spark Authentication”, tick it, save and then Deploy Client Configuration. You might also need to restart services that depend on Spark, like YARN, for example.

If you are using Oozie to launch Spark job via Spark Action, then you should also consider enabling Spark dependencies for Oozie, so that Oozie will pick up Spark’s default configurations under /etc/spark/conf/spark-default.conf file.

To do so, navigate to CM > Oozie > Configuration > “Spark on Yarn Service”, and select “Spark”, save and then restart Oozie:

After above changes, you should not need to manually pass spark.authenticate parameter anymore, either from spark-submit or Oozie Spark Action.