Spark Job SASL Authentication Error

Spark has an internal mechanism that authenticates executors with the driver controlling a given application. This can be controlled by setting “spark.authenticate” to “true”, as part of spark-submit’s parameters, like below:

spark-submit --master yarn-cluster --conf spark.authenticate=true --conf spark.dynamicAllocation.enabled=true ....

This setting is required if you have “spark.authenticate.enableSaslEncryption” enabled, or called “Enable Network Encryption” in Cloudera Manager.

So if you have “Enable Network Encryption” enabled, but do not pass “–conf spark.authenticate=true” for Spark job, the job will fail with below error:

6241 [dispatcher-event-loop-3] ERROR org.apache.spark.storage.BlockManager - 
Failed to connect to external shuffle server, will retry 1 more times after waiting 5 seconds...
java.lang.RuntimeException: java.lang.IllegalStateException: Expected SaslMessage, received something else (maybe your client does not have SASL enabled?)
	at org.apache.spark.network.sasl.SaslMessage.decode(SaslMessage.java:69)
	at org.apache.spark.network.sasl.SaslRpcHandler.receive(SaslRpcHandler.java:87)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:154)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:102)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
	at java.lang.Thread.run(Thread.java:745)
	at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:207)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	at java.lang.Thread.run(Thread.java:745)

To avoid passing “–conf spark.authenticate=true” all the time when you submit spark jobs, you can consider setting it as default for Spark. If you are using Cloudera Manager, navigate to CM > Spark > Configuration > “Spark Authentication”, tick it, save and then Deploy Client Configuration. You might also need to restart services that depend on Spark, like YARN, for example.

If you are using Oozie to launch Spark job via Spark Action, then you should also consider enabling Spark dependencies for Oozie, so that Oozie will pick up Spark’s default configurations under /etc/spark/conf/spark-default.conf file.

To do so, navigate to CM > Oozie > Configuration > “Spark on Yarn Service”, and select “Spark”, save and then restart Oozie:

After above changes, you should not need to manually pass spark.authenticate parameter anymore, either from spark-submit or Oozie Spark Action.

Oozie Spark Action Not Loading Spark Configurations

Recently I was working on an issue that Oozie was not able to pick up Spark’s configuration and caused job to fail. The reason that I know it was not loading Spark’s configuration was because spark had “spark.authenticate=true” set in its configuration file under file /etc/spark/conf/spark-defaults.conf.

$ head /etc/spark/conf/spark-defaults.conf
spark.authenticate=true
spark.authenticate.enableSaslEncryption=false
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.executorIdleTimeout=60
spark.dynamicAllocation.minExecutors=0
....

And I confirmed Oozie job failure can be resolved by adding “–conf spark.authenticate=true” into in the workflow.xml file. In theory, if Spark already has the setting, then Oozie should just pick it up.

By checking Oozie’s configuration file oozie-site.xml, I noticed that the setting that is required to load Spark configuration is missing: oozie.service.SparkConfigurationService.spark.configurations. Without this setting, Oozie will not be able to load those settings and apply to job for Spark Action.

To remedy this, it will be easy if you are using Cloudera Manager, simply go to:

Cloudera Manager > Oozie > Configuration > search for “Spark on Yarn Service”

Then select “Spark” instead of “none” and restart Oozie.

You can then go to oozie-site.xml file for Oozie’s process after restarting and confirm that below configs present:

<property>
    <name>oozie.service.SparkConfigurationService.spark.configurations</name>
    <value>*=/etc/spark/conf</value>
</property>

After above change, Oozie should pick up Spark’s default configurations by default without the need to manually specify for every Spark Action.

Oozie Spark Actions Fail with Error “Spark config without ‘=’: –conf”

Currently Oozie provides easy interface for Spark1 jobs via Spark1 action, so that user does not have to embed spark-submit into shell action. However, recently I have discovered an issue in Oozie that it has a bug to parse Spark configurations and incorrectly generated a spark-submit command to submit Spark jobs. By checking Oozie’s launcher stderr.log, I discovered below error:

Error: Spark config without '=': --conf
Run with --help for usage help or --verbose for debug output
Intercepting System.exit(1)
Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SparkMain], exit code [1]

Also, by checking the stdout.log, I can see below incorrect command for Spark:

  --conf
  spark.yarn.security.tokens.hive.enabled=false
  --conf
  --conf
  spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hive/lib/*:$PWD/*
  --conf
  spark.driver.extraClassPath=$PWD/*

You can see that there were double “–conf” generated by Oozie for Spark command. This explains the error we saw earlier about “Spark config without ‘=’: –conf”.

This is caused by a known issue reported upstream: OOZIE-2923.

This is a bug on Oozie side that it wrongly parses below configs:

--conf spark.executor.extraClassPath=...
--conf spark.driver.extraClassPath=...

The workaround is to remove the “–conf” in front of the first instance of spark.executor.extraClassPath, so that it will be added by Oozie. For example, if you have below :

<spark-opts>
--files /etc/hive/conf/hive-site.xml 
--driver-memory 4G 
--executor-memory 2G 
... 
--conf spark.yarn.security.tokens.hive.enabled=false 
--conf spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hive/lib/*
</spark-opts>

Simply remove the first –conf before spark.executor.extraClassPath, so it becomes:

<spark-opts>
--files /etc/hive/conf/hive-site.xml 
--driver-memory 4G 
--executor-memory 2G 
... 
--conf spark.yarn.security.tokens.hive.enabled=false  
spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hive/lib/*
</spark-opts>

This will allow you to avoid the issue.

However, the downside is that if you decide to upgrade to a version of CDH that contains the fix to this issue, you will need to re-add “–conf” back.

OOZIE-2923 is affecting CDH5.10.x, CDH5.11.0 and CDH5.11.1.

And CDH5.11.2 and CDH5.12.x and above contains the fix.

Hive CLI Prints SLF4J Error to Standard Output

If you have both Hive and Spark running on the same cluster, chances are that Hive CLI will probably produce the following WARNING message upon exit of each session:

WARN: The method class org.apache.commons.logging.impl.SLF4JLogFactory#release() was invoked. 
WARN: Please see http://www.slf4j.org/codes.html#release for an explanation.

Sample full output looks like below:

[root@localhost ~]# hive -e "show tables;"

Logging initialized using configuration in jar:file:/opt/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/jars/hive-common-1.1.0-cdh5.12.1.jar!/hive-log4j.properties
OK
sample_07
sample_08
web_logs
Time taken: 1.281 seconds, Fetched: 3 row(s)
WARN: The method class org.apache.commons.logging.impl.SLF4JLogFactory#release() was invoked.
WARN: Please see http://www.slf4j.org/codes.html#release for an explanation.

Even though Hive CLI has been deprecated in CDH, there are still lots of enterprise users out there are still stuck with Hive CLI due to legacy reasons and it is not easy for them to migrate to use Beeline or query Hive through ODBC or JDBC in their applications.

This is not an issue if you just run Hive CLI from command line and view the output. However, if you want to capture the result set from Hive CLI’s stdout, it will be a trouble, see test case below:

[root@localhost ~]# output=`hive -e "show tables;"`

Logging initialized using configuration in jar:file:/opt/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/jars/hive-common-1.1.0-cdh5.12.1.jar!/hive-log4j.properties
OK
Time taken: 1.281 seconds, Fetched: 3 row(s)

And when you echo out the variable $output, it will contain the WARNING message:

[root@localhost ~]# echo $output
sample_07
sample_08
web_logs
WARN: The method class org.apache.commons.logging.impl.SLF4JLogFactory#release() was invoked.
WARN: Please see http://www.slf4j.org/codes.html#release for an explanation.

And when you want to use this output as other inputs in your application, things will go crazy.

This happens due to the below code under hive’s script file (/opt/cloudera/parcels/CDH/lib/hive/bin/hive) that loads Spark’s JAR file into Hive’s CLASSPATH:

# add Spark assembly jar to the classpath
if [[ -n "$SPARK_HOME" && !("$HIVE_SKIP_SPARK_ASSEMBLY" = "true") ]]
then
  sparkAssemblyPath=`ls ${SPARK_HOME}/lib/spark-assembly-*.jar`
  CLASSPATH="${CLASSPATH}:${sparkAssemblyPath}"
fi

Luckily, the latest CDH release, in fact from CDH 5.12.0, Cloudera has backported an upstream JIRA HIVE-12179, which added a checking for environment variable called “HIVE_SKIP_SPARK_ASSEMBLY”. So we can use this variable to disable the loading of Spark JARs for Hive CLI if you do not need to use Hive on Spark.

So the workaround is as simple as setting “HIVE_SKIP_SPARK_ASSEMBLY” to “true” so that the “if” statement will be skipped. See below example:

[root@localhost ~]# output=`export HIVE_SKIP_SPARK_ASSEMBLY=true; hive -e "show tables;"`

Logging initialized using configuration in jar:file:/opt/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/jars/hive-common-1.1.0-cdh5.12.1.jar!/hive-log4j.properties
OK
Time taken: 1.281 seconds, Fetched: 3 row(s)

And the output will be clean like below:

[root@localhost ~]# echo $output
sample_07
sample_08
web_logs

This workaround will not work if you need to use Hive on Spark in Hive CLI, because it essentially disables the loading of Spark JARs. And of course, using of Hive CLI is strongly NOT recommended, and migration to Beeline or use ODBC/JDBC to connect to HiveServer2 is the right way to go in the long run.

Hope above information helps.

SparkHistory Server Keeps Crashing With OutOfMemory Error

This article explains what to do when you are unable to start up SparkHistory server which keeps crashing with OutOfMemory errors after using Spark for some time.

To confirm that Spark History Server keeps failing with OutOfMemory error on start up, we can check the run time process directory for Spark under /var/run/cloudera-scm-agent/process directory if you using using CDH version of Spark. The stdout.log file contains the following error:

#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="/usr/lib64/cmf/service/common/killparent.sh"
# Executing /bin/sh -c "/usr/lib64/cmf/service/common/killparent.sh"...

One of the possible reason for the failure was due to some large job history files under /user/spark/sparkApplicationHistory (or /user/spark/spark2ApplicationHistory for Spark2). On SparkHistory server startup, it will try to load those files into memory, and if those history files are too big, it will cause history server to crash with OutOfMemory error unless heap size is increased through Cloudera Manager interface.

To confirm this, just run:

hdfs dfs -ls /user/spark/sparkApplicationHistory

or

hdfs dfs -ls /user/spark/spark2ApplicationHistory

and see if there are any files that are in hundreds of MBs in size, if yes, then that will be a problem.

You might also notice that some files might have “.inprogress” extension, like below:

/user/spark/spark2ApplicationHistory/application_1503451614878_0337.inprogress

Those files can become stale in the HDFS directory in the case that Spark job failed prematurely, and Spark did not get a chance to clean up those files. Since SparkHistory server has no way of knowing if those files were left over from a failed Spark job, or if they were still being processed, hence it will just blindly load everything under the HDFS directory into memory, which will cause failure if those files are too big.

Spark has a clean up job to remove any old files that are longer than a pre-defined time period, however, it does not remove stale .inprogress files. This issue was reported in SPARK-8617.

Once SPARK-8617 is fixed, we should not see those stale .inprogress files anymore. But at the time of writing, it has not been backported into CDH yet.

For now, we just need to delete all the files under /user/spark/sparkApplicationHistory or /user/spark/spark2ApplicationHistory directory in HDFS that are older than, say one week, so that those big files can be cleaned up.

After that, we should be able to get SparkHistory server startup without issues.