::Go back to Oozie Documentation Index::


Oozie Spark Action Extension

Spark Action

The spark action runs a Spark job.

The workflow job will wait until the Spark job completes before continuing to the next action.

To run the Spark job, you have to configure the spark action with the =job-tracker=, name-node , Spark master elements as well as the necessary elements, arguments and configuration.

Spark options can be specified in an element called spark-opts

A spark action can be configured to create or delete HDFS directories before starting the Spark job.

Oozie EL expressions can be used in the inline configuration. Property values specified in the configuration element override values specified in the job-xml file.

Syntax:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.3">
    ...
    <action name="[NODE-NAME]">
        <spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>[JOB-TRACKER]</job-tracker>
            <name-node>[NAME-NODE]</name-node>
            <prepare>
               <delete path="[PATH]"/>
               ...
               <mkdir path="[PATH]"/>
               ...
            </prepare>
            <job-xml>[SPARK SETTINGS FILE]</job-xml>
            <configuration>
                <property>
                    <name>[PROPERTY-NAME]</name>
                    <value>[PROPERTY-VALUE]</value>
                </property>
                ...
            </configuration>
            <master>[SPARK MASTER URL]</master>
            <mode>[SPARK MODE]</mode>
            <name>[SPARK JOB NAME]</name>
            <class>[SPARK MAIN CLASS]</class>
            <jar>[SPARK DEPENDENCIES JAR / PYTHON FILE]</jar>
            <spark-opts>[SPARK-OPTIONS]</spark-opts>
            <arg>[ARG-VALUE]</arg>
                ...
            <arg>[ARG-VALUE]</arg>
            ...
        </spark>
        <ok to="[NODE-NAME]"/>
        <error to="[NODE-NAME]"/>
    </action>
    ...
</workflow-app>

The prepare element, if present, indicates a list of paths to delete or create before starting the job. Specified paths must start with hdfs://HOST:PORT .

The job-xml element, if present, specifies a file containing configuration for the Spark job. Multiple job-xml elements are allowed in order to specify multiple job.xml files.

The configuration element, if present, contains configuration properties that are passed to the Spark job.

The master element indicates the url of the Spark Master. Ex: spark://host:port, mesos://host:port, yarn-cluster, yarn-master, or local.

The mode element if present indicates the mode of spark, where to run spark driver program. Ex: client,cluster. This is typically not required because you can specify it as part of master (i.e. master=yarn, mode=client is equivalent to master=yarn-client). A local master always runs in client mode.

Depending on the master (and mode ) entered, the Spark job will run differently as follows:

  • local mode: everything runs here in the Launcher Job.
  • yarn-client mode: the driver runs here in the Launcher Job and the executor in Yarn.
  • yarn-cluster mode: the driver and executor run in Yarn.

The name element indicates the name of the spark application.

The class element if present, indicates the spark's application main class.

The jar element indicates a comma separated list of jars or python files.

The spark-opts element, if present, contains a list of spark options that can be passed to spark driver. Spark configuration options can be passed by specifying '--conf key=value' here, or from oozie.service.SparkConfigurationService.spark.configurations in oozie-site.xml. Values containing whitespaces can be enclosed by double quotes. The spark-opts configs have priority.

Some examples of the spark-opts element:

  • '--conf key=value'
  • '--conf key1=value1 value2'
  • '--conf key1="value1 value2"'
  • '--conf key1=value1 key2="value2 value3"'
  • '--conf key=value --verbose'

The arg element if present, contains arguments that can be passed to spark application.

All the above elements can be parameterized (templatized) using EL expressions.

Example:

<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="myfirstsparkjob">
        <spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>foo:8021</job-tracker>
            <name-node>bar:8020</name-node>
            <prepare>
                <delete path="${jobOutput}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.compress.map.output</name>
                    <value>true</value>
                </property>
            </configuration>
            <master>local[*]</master>
            <mode>client<mode>
            <name>Spark Example</name>
            <class>org.apache.spark.examples.mllib.JavaALS</class>
            <jar>/lib/spark-examples_2.10-1.1.0.jar</jar>
            <spark-opts>--executor-memory 20G --num-executors 50
             --conf spark.executor.extraJavaOptions="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp"</spark-opts>
            <arg>inputpath=hdfs://localhost/input/file.txt</arg>
            <arg>value=2</arg>
        </spark>
        <ok to="myotherjob"/>
        <error to="errorcleanup"/>
    </action>
    ...
</workflow-app>

Spark Action Logging

Spark action logs are redirected to the Oozie Launcher map-reduce job task STDOUT/STDERR that runs Spark.

From Oozie web-console, from the Spark action pop up using the 'Console URL' link, it is possible to navigate to the Oozie Launcher map-reduce job task logs via the Hadoop job-tracker web-console.

Spark on YARN

To ensure that your Spark job shows up in the Spark History Server, make sure to specify these three Spark configuration properties either in spark-opts with --conf or from oozie.service.SparkConfigurationService.spark.configurations in oozie-site.xml.

1. spark.yarn.historyServer.address=SPH-HOST:18088

2. spark.eventLog.dir=hdfs://NN:8020/user/spark/applicationHistory

3. spark.eventLog.enabled=true

PySpark with Spark Action

To submit PySpark scripts with Spark Action, pyspark dependencies must be available in sharelib or in workflow's lib/ directory. For more information, please refer to installation document.

Example:

<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
    ....
    <action name="myfirstpysparkjob">
        <spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>foo:8021</job-tracker>
            <name-node>bar:8020</name-node>
            <prepare>
                <delete path="${jobOutput}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.compress.map.output</name>
                    <value>true</value>
                </property>
            </configuration>
            <master>yarn-cluster</master>
            <name>Spark Example</name>
            <jar>pi.py</jar>
            <spark-opts>--executor-memory 20G --num-executors 50
            --conf spark.executor.extraJavaOptions="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp"</spark-opts>
            <arg>100</arg>
        </spark>
        <ok to="myotherjob"/>
        <error to="errorcleanup"/>
    </action>
    ...
</workflow-app>

The jar element indicates python file. Refer to the file by it's localized name, because only local files are allowed in PySpark. The py file should be in the lib/ folder next to the workflow.xml or added using the file element so that it's localized to the working directory with just its name.

Using Symlink in <jar>

A symlink must be specified using file element. Then, you can use the symlink name in jar element.

Example:

Specifying relative path for symlink:

Make sure that the file is within the application directory i.e. oozie.wf.application.path .

        <spark xmlns="uri:oozie:spark-action:0.2">
        ...
            <jar>py-spark-example-symlink.py</jar>
            ...
            ...
            <file>py-spark.py#py-spark-example-symlink.py</file>
        ...
        </spark>

Specifying full path for symlink:

        <spark xmlns="uri:oozie:spark-action:0.2">
        ...
            <jar>spark-example-symlink.jar</jar>
            ...
            ...
            <file>hdfs://localhost:8020/user/testjars/all-oozie-examples.jar#spark-example-symlink.jar</file>
        ...
        </spark>

Appendix, Spark XML-Schema

AE.A Appendix A, Spark XML-Schema

Spark Action Schema Version 0.1

<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
           xmlns:spark="uri:oozie:spark-action:0.1" elementFormDefault="qualified"
           targetNamespace="uri:oozie:spark-action:0.1">    <xs:element name="spark" type="spark:ACTION"/>
    <xs:complexType name="ACTION">
        <xs:sequence>
            <xs:element name="job-tracker" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="name-node" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="prepare" type="spark:PREPARE" minOccurs="0" maxOccurs="1"/>
            <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="configuration" type="spark:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
            <xs:element name="master" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="mode" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="name" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="class" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="jar" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="spark-opts" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="CONFIGURATION">
        <xs:sequence>
            <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
                <xs:complexType>
                    <xs:sequence>
                        <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
                        <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
                        <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
                    </xs:sequence>
                </xs:complexType>
            </xs:element>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="PREPARE">
        <xs:sequence>
            <xs:element name="delete" type="spark:DELETE" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="mkdir" type="spark:MKDIR" minOccurs="0" maxOccurs="unbounded"/>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="DELETE">
        <xs:attribute name="path" type="xs:string" use="required"/>
    </xs:complexType>
    <xs:complexType name="MKDIR">
        <xs:attribute name="path" type="xs:string" use="required"/>
    </xs:complexType>
</xs:schema>

Spark Action Schema Version 0.2

<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
           xmlns:spark="uri:oozie:spark-action:0.2" elementFormDefault="qualified"
           targetNamespace="uri:oozie:spark-action:0.2">    <xs:element name="spark" type="spark:ACTION"/>
    <xs:complexType name="ACTION">
        <xs:sequence>
            <xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="prepare" type="spark:PREPARE" minOccurs="0" maxOccurs="1"/>
            <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="configuration" type="spark:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
            <xs:element name="master" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="mode" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="name" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="class" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="jar" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="spark-opts" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="CONFIGURATION">
        <xs:sequence>
            <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
                <xs:complexType>
                    <xs:sequence>
                        <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
                        <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
                        <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
                    </xs:sequence>
                </xs:complexType>
            </xs:element>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="PREPARE">
        <xs:sequence>
            <xs:element name="delete" type="spark:DELETE" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="mkdir" type="spark:MKDIR" minOccurs="0" maxOccurs="unbounded"/>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="DELETE">
        <xs:attribute name="path" type="xs:string" use="required"/>
    </xs:complexType>
    <xs:complexType name="MKDIR">
        <xs:attribute name="path" type="xs:string" use="required"/>
    </xs:complexType>
</xs:schema>
::Go back to Oozie Documentation Index::