Part-5: Using Spark as execution engine for Hive

By | October 11, 2020

In this tutorial we will discuss how to use Spark as execution engine for hive. Default execution engine for Hive is MapReduce. MapReduce runs slower usually. Spark is better faster engine for running queries on Hive.

Installation

I assume you already have a running Hadoop, Hive and Spark versions on your VM. Follow Part-1, Part-2 (Optional), Part-3 and Part-4 articles to install Hadoop, Hive and Spark.

It’s important to make sure that Spark and Hive versions are compatible with each other. Follow hive and spark version compatibility from link below,

https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started

Determine Hive and Spark versions to install using link above.

Make sure below environment variables exist in ~/.bashrc file. JAVA_HOME variable should point to your java installation directory.

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export PATH=$PATH:$JAVA_HOME/bin
export HADOOP_HOME=/usr/local/hadoop
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
export HADOOP_CLASSPATH=/usr/lib/jvm/java-8-openjdk-amd64/lib/tools.jar

# Set HIVE_HOME
export HIVE_HOME="/usr/lib/hive/apache-hive-3.1.2-bin"
PATH=$PATH:$HIVE_HOME/bin
export PATH

#Set SPARK home
export SPARK_HOME=/usr/lib/spark/spark-2.3.0-bin-hadoop2.7
PATH=$PATH:$SPARK_HOME/bin
export PATH
export SPARK_MASTER_HOST=localhost
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=7180

Source ~/.bashrc again to reload environment variables.

source ~/.bashrc

Link scala and spark jars in Hive lib folder.

cd $HIVE_HOME/lib
ln -s $SPARK_HOME/jars/scala-library*.jar
ln -s $SPARK_HOME/jars/spark-core*.jar
ln -s $SPARK_HOME/jars/spark-network-common*.jar
ln -s $SPARK_HOME/jars/spark-network-shuffle*.jar

Add below configurations in hive-site.xml to use Spark as execution engine.

vi $HIVE_HOME/conf/hive-site.xml
<property>
    <name>hive.execution.engine</name>
    <value>spark</value>
    <description>Use Map Reduce as default execution engine</description>
</property>
<property>
    <name>spark.master</name>
    <value>spark://localhost:7077</value>
  </property>
<property>
    <name>spark.eventLog.enabled</name>
    <value>true</value>
  </property>
<property>
    <name>spark.eventLog.dir</name>
    <value>/tmp</value>
  </property>
<property>
    <name>spark.serializer</name>
    <value>org.apache.spark.serializer.KryoSerializer</value>
  </property>
<property>
  <name>spark.yarn.jars</name>
  <value>hdfs://localhost:54310/spark-jars/*</value>
</property>

Make sure below properties exist in yarn-site.xml. These properties are hadoop jar paths. They are required to use Spark as execution engine for Hive. You will notice that I am using absolute paths instead of environment variables in below configuration. For some reason environment variables did not work in this configuration for me. Make sure these paths are adjusted as per your Hadoop installation directories.

vi $HADOOP_CONF_DIR/yarn-site.xml
<property>
<name>yarn.application.classpath</name>
<value>/usr/local/hadoop/share/hadoop/mapreduce/*,/usr/local/hadoop/share/hadoop/mapreduce/lib/*,/usr/local/hadoop/share/hadoop/hdfs/*,/usr/local/hadoop/share/hadoop/hdfs/lib/*,/usr/local/hadoop/share/hadoop/common/lib/*,/usr/local/hadoop/share/hadoop/common/*,/usr/local/hadoop/share/hadoop/yarn/lib/*,/usr/local/hadoop/share/hadoop/yarn/*</value>
</property>
<property>
<name>mapreduce.application.classpath</name>
<value>/usr/local/hadoop/share/hadoop/mapreduce/*,/usr/local/hadoop/share/hadoop/mapreduce/lib/*,/usr/local/hadoop/share/hadoop/hdfs/*,/usr/local/hadoop/share/hadoop/hdfs/lib/*,/usr/local/hadoop/share/hadoop/common/lib/*,/usr/local/hadoop/share/hadoop/common/*,/usr/local/hadoop/share/hadoop/yarn/lib/*,/usr/local/hadoop/share/hadoop/yarn/*</value>
</property>

Remove old version of Hive jars from Spark jars folder. This step should be changed as per your version of Hive jars in Spark folder. You can determine version by looking at content of $SPARK_HOME/jars folder with below command

ls $SPARK_HOME/jars/*hive*.jar

In my case above hive jars were having version 1.2.1. Delete them with below command.

rm $SPARK_HOME/jars/hive*1.2.1*

All configuration are now complete. Now run Hive and try inserting a new record in a table. You should see Spark job running.

hive> insert into employee values("3","Bob","Sales");
Query ID = hadoop_user_20201012024838_537a530c-023b-4a51-bd1c-4e1511158b61
Total jobs = 1
Launching Job 1 out of 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Hive on Spark Session Web UI URL: http://localhost:4040

Query Hive on Spark job[0] stages: [0, 1]
Spark job[0] status = RUNNING
--------------------------------------------------------------------------------------
          STAGES   ATTEMPT        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  
--------------------------------------------------------------------------------------
Stage-0 ........         0      FINISHED      1          1        0        0       0  
Stage-1 ........         0      FINISHED      1          1        0        0       0  
--------------------------------------------------------------------------------------
STAGES: 02/02    [==========================>>] 100%  ELAPSED TIME: 9.10 s     
--------------------------------------------------------------------------------------
Spark job[0] finished successfully in 9.10 second(s)
Loading data to table default.employee
OK
Time taken: 31.832 seconds

Troubleshooting

Insert a record, Below error may occur.

Job failed with java.lang.NumberFormatException: For input string: "30s"
        at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
        at java.lang.Long.parseLong(Long.java:589)
        at java.lang.Long.parseLong(Long.java:631)
        at org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1311)
        at org.apache.hadoop.hdfs.DFSClient$Conf.<init>(DFSClient.java:502)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:638)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
        at org.apache.hadoop.hive.ql.exec.Utilities.isEmptyPath(Utilities.java:2610)
        at org.apache.hadoop.hive.ql.exec.Utilities.isEmptyPath(Utilities.java:2606)
        at org.apache.hadoop.hive.ql.exec.Utilities$GetInputPathsCallable.call(Utilities.java:3432)
        at org.apache.hadoop.hive.ql.exec.Utilities.getInputPaths(Utilities.java:3370)
        at org.apache.hadoop.hive.ql.exec.spark.SparkPlanGenerator.cloneJobConf(SparkPlanGenerator.java:318)
        at org.apache.hadoop.hive.ql.exec.spark.SparkPlanGenerator.generate(SparkPlanGenerator.java:241)
        at org.apache.hadoop.hive.ql.exec.spark.SparkPlanGenerator.generate(SparkPlanGenerator.java:113)
        at org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient$JobStatusJob.call(RemoteHiveSparkClient.java:359)
        at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:378)
        at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:343)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Spark job failed during runtime. Please check stacktrace for the root cause.

As you can see in error message this happens because of Number Format. It is set in hadoop hdfs-site.xml configuration file. I found error related article on below link.

https://stackoverflow.com/questions/61369722/apache-tez-job-fails-due-to-java-lang-numberformatexception-for-input-string-3

To solve above error, edit hdfs-site.xml file.

vi $HADOOP_CONF_DIR/hdfs-site.xml

Add below property. Default value for this is “30S” which is not compatible with Hadoop 2.0 libraries.

<property>
<name>dfs.client.datanode-restart.timeout</name>
<value>30</value>
</property>

After above change, insert query should work fine.

If you see below error that means you have not configured Spark with Hive properly or you are using unsupported version of Spark with Hive.

FAILED: SemanticException Failed to get a spark session: org.apache.hadoop.hive.ql.metadata.HiveException: Failed to create Spark client for Spark session db1fba45-1d05-438f-ba78-08da0e547977

Check Spark and Hive compatibility version on this link.

Leave a Reply

Your email address will not be published. Required fields are marked *