How to Disable Actions in Oozie

Oozie is an orchestration system for managing Hadoop jobs. Currently it supports various actions, including, but not limited to, Spark1, Spark2, Shell, SSH, Hive, Sqoop and Java etc. However, due to certain business requirements, sometimes we want to disable some of the actions so that we can control how users use Oozie to run jobs.

For example, currently when you setup a SSH action, it is required that you need to setup passwordless login for a certain user from Oozie server host to the target host. This will also allow any user to be able to setup a job to run on the remote machine, so long as you know the username and remote host domain name. This is a security concern. There are thoughts on how to improve it, but not yet have a solution at this stage.

So if business has such concerns, we can disable SSH action easily. Please follow below steps (assuming that you are using Cloudera Manager to manage CDH Hadoop):

1. Go to Cloudera Manager home page > Oozie > Configuration
2. Locate configuration called “Oozie Server Advanced Configuration Snippet (Safety Valve) for oozie-site.xml”
3. Click on the “Add” button and enter “oozie.service.ActionService.executor.classes” for name and value as below:

org.apache.oozie.action.decision.DecisionActionExecutor,org.apache.oozie.action.hadoop.JavaActionExecutor,org.apache.oozie.action.hadoop.FsActionExecutor,org.apache.oozie.action.hadoop.MapReduceActionExecutor,org.apache.oozie.action.hadoop.PigActionExecutor,org.apache.oozie.action.hadoop.HiveActionExecutor,org.apache.oozie.action.hadoop.ShellActionExecutor,org.apache.oozie.action.hadoop.SqoopActionExecutor,org.apache.oozie.action.hadoop.DistcpActionExecutor,org.apache.oozie.action.hadoop.Hive2ActionExecutor,org.apache.oozie.action.oozie.SubWorkflowActionExecutor,org.apache.oozie.action.email.EmailActionExecutor,org.apache.oozie.action.hadoop.SparkActionExecutor

The full list is:

org.apache.oozie.action.decision.DecisionActionExecutor,org.apache.oozie.action.hadoop.JavaActionExecutor,org.apache.oozie.action.hadoop.FsActionExecutor,org.apache.oozie.action.hadoop.MapReduceActionExecutor,org.apache.oozie.action.hadoop.PigActionExecutor,org.apache.oozie.action.hadoop.HiveActionExecutor,org.apache.oozie.action.hadoop.ShellActionExecutor,org.apache.oozie.action.hadoop.SqoopActionExecutor,org.apache.oozie.action.hadoop.DistcpActionExecutor,org.apache.oozie.action.hadoop.Hive2ActionExecutor,org.apache.oozie.action.ssh.SshActionExecutor,org.apache.oozie.action.oozie.SubWorkflowActionExecutor,org.apache.oozie.action.email.EmailActionExecutor,org.apache.oozie.action.hadoop.SparkActionExecutor

so we just need to remove org.apache.oozie.action.ssh.SshActionExecutor action class. Basically, just remove the corresponding action classes that you do not want Oozie to support.

4. Save and restart Oozie

After that, if you try to run a SSH action through Oozie, it will fail. And sample error from Hue looks like below:

Please keep in mind that if you do make this change, remember to check the version of Oozie that you will upgrade to in the future, to make sure that the new supported classes are added to this list, otherwise other jobs will fail. For example, currently Oozie can only support running Spark1 action, Spark2 is not supported. However, in latest CDH version 6.x, Spark2 is now supported and the list will need to be updated.

Oozie SSH Action Does Not Support Chained Commands – OOZIE-1974

I have seen quite a few CDH users who try to run chained Linux command via Oozie’s SSH Action. Example is like below:

<action name="sshTest">
  <ssh xmlns="uri:oozie:ssh-action:0.1">
    <host>${sshUserHost}</host>
    <command>kinit test.keytab test@TEST.COM ; python ....</command>
    <capture-output/>
  </ssh>
  <ok to="nextActino"/>
  <error to="kill"/>
</action>

We can see that the command to run on remote host is as below:

kinit test.keytab test@TEST.COM ; python ....

This is OK if both commands can finish successfully very quickly. However it will cause the SSH action to fail if the python command needs to run for a certain time, say more than 5-10 minutes. Below is the example log messages produced in Oozie’s server log while the SSH action is running:

2018-08-13 10:01:48,215 WARN org.apache.oozie.command.wf.CompletedActionXCommand: SERVER[{oozie-host}] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0000234-172707121423674-oozie-oozi-W] ACTION[0000234-172707121423674-oozie-oozi-W@sshTest] Received early callback for action still in PREP state; will wait [10,000]ms and requeue up to [5] more times
2018-08-13 10:01:48,216 WARN org.apache.oozie.command.wf.CompletedActionXCommand: SERVER[{oozie-host}] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0000234-172707121423674-oozie-oozi-W] ACTION[0000234-172707121423674-oozie-oozi-W@sshTest] Received early callback for action still in PREP state; will wait [10,000]ms and requeue up to [5] more times

....

2018-08-13 10:02:38,243 ERROR org.apache.oozie.command.wf.CompletedActionXCommand: SERVER[cdlpf1hdpm1004.es.ad.adp.com] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0000234-172707121423674-oozie-oozi-W] ACTION[0000234-172707121423674-oozie-oozi-W@sshTest] XException, 
org.apache.oozie.command.CommandException: E0822: Received early callback for action [0000234-172707121423674-oozie-oozi-W@sshTest] while still in PREP state and exhausted all requeues
 at org.apache.oozie.command.wf.CompletedActionXCommand.execute(CompletedActionXCommand.java:114)
 at org.apache.oozie.command.wf.CompletedActionXCommand.execute(CompletedActionXCommand.java:39)
 at org.apache.oozie.command.XCommand.call(XCommand.java:286)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at org.apache.oozie.service.CallableQueueService$CallableWrapper.run(CallableQueueService.java:179)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

The reason for the failure is because Oozie currently does not support chained Linux commands on SSH Action, which is tracked via upstream JIRA OOZIE-1974.

Below is what happened behind the scene:

1. ssh-base.sh and ssh-wrapper.sh files will be copied to target host
https://github.com/apache/oozie/blob/master/core/src/main/resources

2. Oozie will run below command from Oozie server via ssh directly to the target host:

sh ssh-base.sh FLATTEN_ARGS curl "http://{oozie-host}:11000/oozie/callback?id=0000234-172707121423674-oozie-oozi-W@sshTest&status=#status" \
"--data-binary%%%@#stdout%%%--request%%%POST%%%--header%%%\"content-type:text/plain\"" \
0000234-172707121423674-oozie-oozi-W@sshTest@3 kinit test.keytab test@TEST.COM ; python ....

3. based on the command from above, we can see that the command was rebuilt, now the full command will be broken into two commands:

sh ssh-base.sh FLATTEN_ARGS curl "http://{oozie-host}:11000/oozie/callback?id=0000234-172707121423674-oozie-oozi-W@sshTest&status=#status" \
"--data-binary%%%@#stdout%%%--request%%%POST%%%--header%%%\"content-type:text/plain\"" \
0000234-172707121423674-oozie-oozi-W@sshTest@3 kinit test.keytab test@TEST.COM

and

python ....

Not the original “kinit test.keytab test@TEST.COM” and “python ….”

4. ssh-base.sh script will in term run below command:

sh ssh-wrapper.sh FLATTEN_ARGS curl "http://{oozie-host}:11000/oozie/callback?id=0000234-172707121423674-oozie-oozi-W@sshTest&status=#status" \
"--data-binary%%%@#stdout%%%--request%%%POST%%%--header%%%\"content-type:text/plain\"" \
0000234-172707121423674-oozie-oozi-W@sshTest@3 kinit test.keytab test@TEST.COM

This command will finish very quickly and triggered callback curl call immediately, however, the “python” command will cause the SSH job to not finish until it finishes. This is causing the Oozie job in the pending state and causing the callback to fail after timeout because the Oozie job and SSH job states are not consistent.

So until OOZIE-1974 is fixed, the solution is to put both the commands inside a single script file and make it available to run on the remote host.

Hope above helps.

Spark Job SASL Authentication Error

Spark has an internal mechanism that authenticates executors with the driver controlling a given application. This can be controlled by setting “spark.authenticate” to “true”, as part of spark-submit’s parameters, like below:

spark-submit --master yarn-cluster --conf spark.authenticate=true --conf spark.dynamicAllocation.enabled=true ....

This setting is required if you have “spark.authenticate.enableSaslEncryption” enabled, or called “Enable Network Encryption” in Cloudera Manager.

So if you have “Enable Network Encryption” enabled, but do not pass “–conf spark.authenticate=true” for Spark job, the job will fail with below error:

6241 [dispatcher-event-loop-3] ERROR org.apache.spark.storage.BlockManager - 
Failed to connect to external shuffle server, will retry 1 more times after waiting 5 seconds...
java.lang.RuntimeException: java.lang.IllegalStateException: Expected SaslMessage, received something else (maybe your client does not have SASL enabled?)
	at org.apache.spark.network.sasl.SaslMessage.decode(SaslMessage.java:69)
	at org.apache.spark.network.sasl.SaslRpcHandler.receive(SaslRpcHandler.java:87)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:154)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:102)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
	at java.lang.Thread.run(Thread.java:745)
	at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:207)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	at java.lang.Thread.run(Thread.java:745)

To avoid passing “–conf spark.authenticate=true” all the time when you submit spark jobs, you can consider setting it as default for Spark. If you are using Cloudera Manager, navigate to CM > Spark > Configuration > “Spark Authentication”, tick it, save and then Deploy Client Configuration. You might also need to restart services that depend on Spark, like YARN, for example.

If you are using Oozie to launch Spark job via Spark Action, then you should also consider enabling Spark dependencies for Oozie, so that Oozie will pick up Spark’s default configurations under /etc/spark/conf/spark-default.conf file.

To do so, navigate to CM > Oozie > Configuration > “Spark on Yarn Service”, and select “Spark”, save and then restart Oozie:

After above changes, you should not need to manually pass spark.authenticate parameter anymore, either from spark-submit or Oozie Spark Action.

Oozie Workflow Failed with Error “stream exceeds limit”

Last week I was working with a customer trying to fix an issue that Oozie SSH action failed with “stream exceeds limit” error. This error does not appear in the Oozie launcher log, but rather in the Oozie server log. This means there is no problem launching the job, but failed when Oozie was trying to parse the output from the launcher.

The full error message looks like below:

2018-06-13 02:24:38,879 WARN org.apache.oozie.servlet.CallbackServlet: 
SERVER[xxxx.xxxx.xxxx.com] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0557604-180517170833199-oozie-oozi-W] 
ACTION[0557604-180517170833199-oozie-oozi-W@ssh-4d02] 
URL[POST http://xxxx.xxxx.xxxx.com:11000/oozie/callback?id=0557604-180517170833199-oozie-oozi-W@ssh-4d02&status=ERROR] 
user error, stream exceeds limit [2,048]
java.lang.IllegalArgumentException: stream exceeds limit [2,048]
at org.apache.oozie.util.IOUtils.getReaderAsString(IOUtils.java:84)
at org.apache.oozie.servlet.CallbackServlet.doPost(CallbackServlet.java:117)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
at org.apache.oozie.servlet.JsonRestServlet.service(JsonRestServlet.java:289)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.apache.oozie.servlet.HostnameFilter.doFilter(HostnameFilter.java:86)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:235)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:233)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:191)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:293)
at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:861)
at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.process(Http11Protocol.java:612)
at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:503)
at java.lang.Thread.run(Thread.java:748)

By checking the source code, I found out that it failed right here at CallbackServlet.java:

    /**
     * POST callback
     */
    @Override
    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException,
            IOException {
        String queryString = request.getQueryString();
        CallbackService callbackService = Services.get().get(CallbackService.class);

        if (!callbackService.isValid(queryString)) {
            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
        }

        String actionId = callbackService.getActionId(queryString);
        if (actionId == null) {
            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
        }
        log = XLog.getLog(getClass());
        setLogInfo(actionId);
        log.debug("Received a CallbackServlet.doPost() with query string " + queryString);

        validateContentType(request, RestConstants.TEXT_CONTENT_TYPE);
        try {
            log.info(XLog.STD, "callback for action [{0}]", actionId);
            String data = IOUtils.getReaderAsString(request.getReader(), maxDataLen); // Failed here
.....

And maxDataLen is defined earlier in the class:

    @Override
    public void init() {
        maxDataLen = ConfigurationService.getInt(CONF_MAX_DATA_LEN); // maxDataLen defined here
    }

which is defined here:

    public final static String CONF_MAX_DATA_LEN = "oozie.servlet.CallbackServlet.max.data.len";

And then follow the path, we can find that oozie.servlet.CallbackServlet.max.data.len has default value of 2048

    <property>
        <name>oozie.servlet.CallbackServlet.max.data.len</name>
        <value>2048</value>
        <description>
            Max size in characters for the action completion data output.
        </description>
    </property>

This matches with the error we saw earlier:

java.lang.IllegalArgumentException: stream exceeds limit [2,048]

So from here, it is pretty clear that the config we need to change is oozie.servlet.CallbackServlet.max.data.len inside oozie-site.xml file. If you are using Cloudera Manager, please follow below steps:

1. Go to Cloudera Manager > Oozie > Configuration > “Oozie Server Advanced Configuration Snippet (Safety Valve) for oozie-site.xml” and enter:

<property>
    <name>oozie.servlet.CallbackServlet.max.data.len</name>
    <value>8192</value>
</property>

2. Save and restart Oozie.

This will extend the CallbackServlet data to 8K, from default of 2K. It should be enough in most cases. If it still fails, then you would better find out from your application as to why it produces so much output, which Oozie will try to capture and store in its database.

Hope above helps.

Oozie Spark Action Not Loading Spark Configurations

Recently I was working on an issue that Oozie was not able to pick up Spark’s configuration and caused job to fail. The reason that I know it was not loading Spark’s configuration was because spark had “spark.authenticate=true” set in its configuration file under file /etc/spark/conf/spark-defaults.conf.

$ head /etc/spark/conf/spark-defaults.conf
spark.authenticate=true
spark.authenticate.enableSaslEncryption=false
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.executorIdleTimeout=60
spark.dynamicAllocation.minExecutors=0
....

And I confirmed Oozie job failure can be resolved by adding “–conf spark.authenticate=true” into in the workflow.xml file. In theory, if Spark already has the setting, then Oozie should just pick it up.

By checking Oozie’s configuration file oozie-site.xml, I noticed that the setting that is required to load Spark configuration is missing: oozie.service.SparkConfigurationService.spark.configurations. Without this setting, Oozie will not be able to load those settings and apply to job for Spark Action.

To remedy this, it will be easy if you are using Cloudera Manager, simply go to:

Cloudera Manager > Oozie > Configuration > search for “Spark on Yarn Service”

Then select “Spark” instead of “none” and restart Oozie.

You can then go to oozie-site.xml file for Oozie’s process after restarting and confirm that below configs present:

<property>
    <name>oozie.service.SparkConfigurationService.spark.configurations</name>
    <value>*=/etc/spark/conf</value>
</property>

After above change, Oozie should pick up Spark’s default configurations by default without the need to manually specify for every Spark Action.