Spark Job SASL Authentication Error

Spark has an internal mechanism that authenticates executors with the driver controlling a given application. This can be controlled by setting “spark.authenticate” to “true”, as part of spark-submit’s parameters, like below:

spark-submit --master yarn-cluster --conf spark.authenticate=true --conf spark.dynamicAllocation.enabled=true ....

This setting is required if you have “spark.authenticate.enableSaslEncryption” enabled, or called “Enable Network Encryption” in Cloudera Manager.

So if you have “Enable Network Encryption” enabled, but do not pass “–conf spark.authenticate=true” for Spark job, the job will fail with below error:

6241 [dispatcher-event-loop-3] ERROR org.apache.spark.storage.BlockManager - 
Failed to connect to external shuffle server, will retry 1 more times after waiting 5 seconds...
java.lang.RuntimeException: java.lang.IllegalStateException: Expected SaslMessage, received something else (maybe your client does not have SASL enabled?)
	at org.apache.spark.network.sasl.SaslMessage.decode(SaslMessage.java:69)
	at org.apache.spark.network.sasl.SaslRpcHandler.receive(SaslRpcHandler.java:87)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:154)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:102)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
	at java.lang.Thread.run(Thread.java:745)
	at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:207)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	at java.lang.Thread.run(Thread.java:745)

To avoid passing “–conf spark.authenticate=true” all the time when you submit spark jobs, you can consider setting it as default for Spark. If you are using Cloudera Manager, navigate to CM > Spark > Configuration > “Spark Authentication”, tick it, save and then Deploy Client Configuration. You might also need to restart services that depend on Spark, like YARN, for example.

If you are using Oozie to launch Spark job via Spark Action, then you should also consider enabling Spark dependencies for Oozie, so that Oozie will pick up Spark’s default configurations under /etc/spark/conf/spark-default.conf file.

To do so, navigate to CM > Oozie > Configuration > “Spark on Yarn Service”, and select “Spark”, save and then restart Oozie:

After above changes, you should not need to manually pass spark.authenticate parameter anymore, either from spark-submit or Oozie Spark Action.

Oozie Workflow Failed with Error “stream exceeds limit”

Last week I was working with a customer trying to fix an issue that Oozie SSH action failed with “stream exceeds limit” error. This error does not appear in the Oozie launcher log, but rather in the Oozie server log. This means there is no problem launching the job, but failed when Oozie was trying to parse the output from the launcher.

The full error message looks like below:

2018-06-13 02:24:38,879 WARN org.apache.oozie.servlet.CallbackServlet: 
SERVER[xxxx.xxxx.xxxx.com] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0557604-180517170833199-oozie-oozi-W] 
ACTION[0557604-180517170833199-oozie-oozi-W@ssh-4d02] 
URL[POST http://xxxx.xxxx.xxxx.com:11000/oozie/callback?id=0557604-180517170833199-oozie-oozi-W@ssh-4d02&status=ERROR] 
user error, stream exceeds limit [2,048]
java.lang.IllegalArgumentException: stream exceeds limit [2,048]
at org.apache.oozie.util.IOUtils.getReaderAsString(IOUtils.java:84)
at org.apache.oozie.servlet.CallbackServlet.doPost(CallbackServlet.java:117)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
at org.apache.oozie.servlet.JsonRestServlet.service(JsonRestServlet.java:289)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.apache.oozie.servlet.HostnameFilter.doFilter(HostnameFilter.java:86)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:235)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:233)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:191)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:293)
at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:861)
at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.process(Http11Protocol.java:612)
at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:503)
at java.lang.Thread.run(Thread.java:748)

By checking the source code, I found out that it failed right here at CallbackServlet.java:

    /**
     * POST callback
     */
    @Override
    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException,
            IOException {
        String queryString = request.getQueryString();
        CallbackService callbackService = Services.get().get(CallbackService.class);

        if (!callbackService.isValid(queryString)) {
            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
        }

        String actionId = callbackService.getActionId(queryString);
        if (actionId == null) {
            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0402, queryString);
        }
        log = XLog.getLog(getClass());
        setLogInfo(actionId);
        log.debug("Received a CallbackServlet.doPost() with query string " + queryString);

        validateContentType(request, RestConstants.TEXT_CONTENT_TYPE);
        try {
            log.info(XLog.STD, "callback for action [{0}]", actionId);
            String data = IOUtils.getReaderAsString(request.getReader(), maxDataLen); // Failed here
.....

And maxDataLen is defined earlier in the class:

    @Override
    public void init() {
        maxDataLen = ConfigurationService.getInt(CONF_MAX_DATA_LEN); // maxDataLen defined here
    }

which is defined here:

    public final static String CONF_MAX_DATA_LEN = "oozie.servlet.CallbackServlet.max.data.len";

And then follow the path, we can find that oozie.servlet.CallbackServlet.max.data.len has default value of 2048

    <property>
        <name>oozie.servlet.CallbackServlet.max.data.len</name>
        <value>2048</value>
        <description>
            Max size in characters for the action completion data output.
        </description>
    </property>

This matches with the error we saw earlier:

java.lang.IllegalArgumentException: stream exceeds limit [2,048]

So from here, it is pretty clear that the config we need to change is oozie.servlet.CallbackServlet.max.data.len inside oozie-site.xml file. If you are using Cloudera Manager, please follow below steps:

1. Go to Cloudera Manager > Oozie > Configuration > “Oozie Server Advanced Configuration Snippet (Safety Valve) for oozie-site.xml” and enter:

<property>
    <name>oozie.servlet.CallbackServlet.max.data.len</name>
    <value>8192</value>
</property>

2. Save and restart Oozie.

This will extend the CallbackServlet data to 8K, from default of 2K. It should be enough in most cases. If it still fails, then you would better find out from your application as to why it produces so much output, which Oozie will try to capture and store in its database.

Hope above helps.

Oozie Spark Action Not Loading Spark Configurations

Recently I was working on an issue that Oozie was not able to pick up Spark’s configuration and caused job to fail. The reason that I know it was not loading Spark’s configuration was because spark had “spark.authenticate=true” set in its configuration file under file /etc/spark/conf/spark-defaults.conf.

$ head /etc/spark/conf/spark-defaults.conf
spark.authenticate=true
spark.authenticate.enableSaslEncryption=false
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.executorIdleTimeout=60
spark.dynamicAllocation.minExecutors=0
....

And I confirmed Oozie job failure can be resolved by adding “–conf spark.authenticate=true” into in the workflow.xml file. In theory, if Spark already has the setting, then Oozie should just pick it up.

By checking Oozie’s configuration file oozie-site.xml, I noticed that the setting that is required to load Spark configuration is missing: oozie.service.SparkConfigurationService.spark.configurations. Without this setting, Oozie will not be able to load those settings and apply to job for Spark Action.

To remedy this, it will be easy if you are using Cloudera Manager, simply go to:

Cloudera Manager > Oozie > Configuration > search for “Spark on Yarn Service”

Then select “Spark” instead of “none” and restart Oozie.

You can then go to oozie-site.xml file for Oozie’s process after restarting and confirm that below configs present:

<property>
    <name>oozie.service.SparkConfigurationService.spark.configurations</name>
    <value>*=/etc/spark/conf</value>
</property>

After above change, Oozie should pick up Spark’s default configurations by default without the need to manually specify for every Spark Action.

Oozie Hive2 Action Failed with Error: “HiveSQLException: Failed to execute session hooks”

If you have an Oozie Hive2 job that fails with below error message randomly, which can be found in Oozie’s server log, located by default under /var/log/oozie:

2018-06-02 09:00:01,103 WARN org.apache.oozie.action.hadoop.Hive2Credentials: SERVER[hlp3058p.oocl.com] 
USER[dmsa_appln] GROUP[-] TOKEN[] APP[DMSA_CMTX_PCON_ETL_ONLY] JOB[0010548-180302135253124-oozie-oozi-W] 
ACTION[0010548-180302135253124-oozie-oozi-W@spark-6799] Exception in addtoJobConf
org.apache.hive.service.cli.HiveSQLException: Failed to execute session hooks
        at org.apache.hive.jdbc.Utils.verifySuccess(Utils.java:241)
        at org.apache.hive.jdbc.Utils.verifySuccess(Utils.java:232)
        at org.apache.hive.jdbc.HiveConnection.openSession(HiveConnection.java:491)
        at org.apache.hive.jdbc.HiveConnection.<init>(HiveConnection.java:181)
        at org.apache.hive.jdbc.HiveDriver.connect(HiveDriver.java:105)
        at java.sql.DriverManager.getConnection(DriverManager.java:571)
        at java.sql.DriverManager.getConnection(DriverManager.java:233)
        at org.apache.oozie.action.hadoop.Hive2Credentials.addtoJobConf(Hive2Credentials.java:66)
        at org.apache.oozie.action.hadoop.JavaActionExecutor.setCredentialTokens(JavaActionExecutor.java:1213)
        at org.apache.oozie.action.hadoop.JavaActionExecutor.submitLauncher(JavaActionExecutor.java:1063)
        at org.apache.oozie.action.hadoop.JavaActionExecutor.start(JavaActionExecutor.java:1295)
        at org.apache.oozie.command.wf.ActionStartXCommand.execute(ActionStartXCommand.java:232)
        at org.apache.oozie.command.wf.ActionStartXCommand.execute(ActionStartXCommand.java:63)
        at org.apache.oozie.command.XCommand.call(XCommand.java:286)
        at org.apache.oozie.service.CallableQueueService$CompositeCallable.call(CallableQueueService.java:332)
        at org.apache.oozie.service.CallableQueueService$CompositeCallable.call(CallableQueueService.java:261)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at org.apache.oozie.service.CallableQueueService$CallableWrapper.run(CallableQueueService.java:179)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hive.service.cli.HiveSQLException: Failed to execute session hooks
        at org.apache.hive.service.cli.session.SessionManager.openSession(SessionManager.java:308)
        at org.apache.hive.service.cli.CLIService.openSession(CLIService.java:178)
        at org.apache.hive.service.cli.thrift.ThriftCLIService.getSessionHandle(ThriftCLIService.java:422)
        at org.apache.hive.service.cli.thrift.ThriftCLIService.OpenSession(ThriftCLIService.java:316)
        at org.apache.hive.service.cli.thrift.TCLIService$Processor$OpenSession.getResult(TCLIService.java:1253)
        at org.apache.hive.service.cli.thrift.TCLIService$Processor$OpenSession.getResult(TCLIService.java:1238)
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
        at org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge$Server$TUGIAssumingProcessor.process(HadoopThriftAuthBridge.java:746)
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
        ... 3 more
Caused by: java.lang.IllegalStateException: zip file closed
        at java.util.zip.ZipFile.ensureOpen(ZipFile.java:634)
        at java.util.zip.ZipFile.getEntry(ZipFile.java:305)
        at java.util.jar.JarFile.getEntry(JarFile.java:227)
        at sun.net.www.protocol.jar.URLJarFile.getEntry(URLJarFile.java:128)
        at sun.net.www.protocol.jar.JarURLConnection.connect(JarURLConnection.java:132)
        at sun.net.www.protocol.jar.JarURLConnection.getInputStream(JarURLConnection.java:150)
        at java.net.URLClassLoader.getResourceAsStream(URLClassLoader.java:233)
        at javax.xml.parsers.SecuritySupport$4.run(SecuritySupport.java:94)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.xml.parsers.SecuritySupport.getResourceAsStream(SecuritySupport.java:87)
        at javax.xml.parsers.FactoryFinder.findJarServiceProvider(FactoryFinder.java:283)
        at javax.xml.parsers.FactoryFinder.find(FactoryFinder.java:255)
        at javax.xml.parsers.DocumentBuilderFactory.newInstance(DocumentBuilderFactory.java:121)
        at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2526)
        at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2513)
        at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2409)
        at org.apache.hadoop.conf.Configuration.get(Configuration.java:982)
        at org.apache.sentry.binding.hive.conf.HiveAuthzConf.<init>(HiveAuthzConf.java:162)
        at org.apache.sentry.binding.hive.HiveAuthzBindingHook.loadAuthzConf(HiveAuthzBindingHook.java:131)
        at org.apache.sentry.binding.hive.HiveAuthzBindingSessionHook.run(HiveAuthzBindingSessionHook.java:108)
        at org.apache.hive.service.cli.session.SessionManager.executeSessionHooks(SessionManager.java:420)
        at org.apache.hive.service.cli.session.SessionManager.openSession(SessionManager.java:300)
        ... 12 more

It is likely that you are hitting a possible issue with JDK. Please refer to HADOOP-13809 for details. There is no prove at this stage that it is JDK bug, but workaround is at JDK level. As mentioned in the JIRA, you can add below parameters to HiveServer2’s Java options:

-Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl

If you are using Cloudera Manager, you can go to:

CM > Hive > Configuration > Search “Java configuration options for HiveServer2”

and add above parameters to the end of the string, and don’t forget an extra space before it.

Then restart HiveServer2 through CM. This should help to avoid the issue.

Impala Query Failed with ERROR “AnalysisException: ORDER BY expression not produced by aggregation output”

Recently, I discovered a bug in Impala that when you are using Expression in the ORDER BY clause, the query will fail with below error message:

ERROR: AnalysisException: ORDER BY expression not produced by aggregation output (missing from GROUP BY clause?): (CASE WHEN TRUE THEN 1 ELSE a END)

Customer used a very complicated query, and I managed to simplify it to look something like below:

DROP TABLE IF EXISTS test;
CREATE TABLE test (a int);

SELECT   ( 
    CASE 
       WHEN (1 =1) 
       THEN 1
       ELSE a
    end) AS b
FROM  test 
GROUP BY 1 
ORDER BY ( 
    CASE 
       WHEN (1 =1) 
       THEN 1
       ELSE a
    end);

This can be re-produced from CDH5.13.x onward. Since I can also re-produce in latest CDH 5.15.x at the time of writing, I went ahead to create a bug report in upstream JIRA: IMPALA-7083.

As you can see that mentioned in the JIRA, the workaround is to disable ENABLE_EXPR_REWRITES via:

SET ENABLE_EXPR_REWRITES=false;

This is on by default in latest release.

Another workaround, which is a better approach in my opinion, is to replace the expression using number:

DROP TABLE IF EXISTS test;
CREATE TABLE test (a int);

SELECT   ( 
    CASE 
       WHEN (1 =1) 
       THEN 1
       ELSE a
    end) AS b
FROM  test 
GROUP BY 1 
ORDER BY 1;

This will also makes query simpler and easier to read.

However, lots of users use third party software like SAS etc to run queries and they have no control over the query generation, then setting ENABLE_EXPR_REWRITES to false is the way to go.

Hope above helps.