001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie;
019
020import java.io.IOException;
021import java.io.Writer;
022import java.util.Map;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.oozie.client.CoordinatorJob;
026import org.apache.oozie.client.WorkflowJob;
027import org.apache.oozie.executor.jpa.JPAExecutorException;
028import org.apache.oozie.service.JMSTopicService;
029import org.apache.oozie.service.Services;
030
031public abstract class BaseEngine {
032    public static final String USE_XCOMMAND = "oozie.useXCommand";
033
034    protected String user;
035
036    /**
037     * Return the user name.
038     *
039     * @return the user name.
040     */
041    public String getUser() {
042        return user;
043    }
044
045    /**
046     * Submit a job.
047     * <p/>
048     * It validates configuration properties.
049     *
050     * @param conf job configuration.
051     * @param startJob indicates if the job should be started or not.
052     * @return the job Id.
053     * @throws BaseEngineException thrown if the job could not be created.
054     */
055    public abstract String submitJob(Configuration conf, boolean startJob) throws BaseEngineException;
056
057    /**
058     * Start a job.
059     *
060     * @param jobId job Id.
061     * @throws BaseEngineException thrown if the job could not be started.
062     */
063    public abstract void start(String jobId) throws BaseEngineException;
064
065    /**
066     * Resume a job.
067     *
068     * @param jobId job Id.
069     * @throws BaseEngineException thrown if the job could not be resumed.
070     */
071    public abstract void resume(String jobId) throws BaseEngineException;
072
073    /**
074     * Suspend a job.
075     *
076     * @param jobId job Id.
077     * @throws BaseEngineException thrown if the job could not be suspended.
078     */
079    public abstract void suspend(String jobId) throws BaseEngineException;
080
081    /**
082     * Kill a job.
083     *
084     * @param jobId job Id.
085     * @throws BaseEngineException thrown if the job could not be killed.
086     */
087    public abstract void kill(String jobId) throws BaseEngineException;
088
089    /**
090     * Change a coordinator job.
091     *
092     * @param jobId job Id.
093     * @param changeValue change value.
094     * @throws BaseEngineException thrown if the job could not be changed.
095     */
096    public abstract void change(String jobId, String changeValue) throws BaseEngineException;
097
098    /**
099     * Rerun a job.
100     *
101     * @param jobId job Id to rerun.
102     * @param conf configuration information for the rerun.
103     * @throws BaseEngineException thrown if the job could not be rerun.
104     */
105    public abstract void reRun(String jobId, Configuration conf) throws BaseEngineException;
106
107    /**
108     * Return the info about a wf job.
109     *
110     * @param jobId job Id.
111     * @return the workflow job info.
112     * @throws DagEngineException thrown if the job info could not be obtained.
113     */
114    public abstract WorkflowJob getJob(String jobId) throws BaseEngineException;
115
116    /**
117     * Return the info about a wf job with actions subset.
118     *
119     * @param jobId job Id
120     * @param start starting from this index in the list of actions belonging to the job
121     * @param length number of actions to be returned
122     * @return the workflow job info.
123     * @throws DagEngineException thrown if the job info could not be obtained.
124     */
125    public abstract WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException;
126
127    /**
128     * Return the info about a coord job.
129     *
130     * @param jobId job Id.
131     * @return the coord job info.
132     * @throws BaseEngineException thrown if the job info could not be obtained.
133     */
134    public abstract CoordinatorJob getCoordJob(String jobId) throws BaseEngineException;
135
136    /**
137     * Return the info about a coord job with actions subset.
138     *
139     * @param jobId job Id.
140     * @param filter the status filter
141     * @param start starting from this index in the list of actions belonging to the job
142     * @param length number of actions to be returned
143     * @param order true if actions are sorted in a descending order of nominal time, false if asc order
144     * @return the coord job info.
145     * @throws BaseEngineException thrown if the job info could not be obtained.
146     */
147    public abstract CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc)
148            throws BaseEngineException;
149
150    /**
151     * Return the a job definition.
152     *
153     * @param jobId job Id.
154     * @return the job definition.
155     * @throws BaseEngineException thrown if the job definition could no be obtained.
156     */
157    public abstract String getDefinition(String jobId) throws BaseEngineException;
158
159    /**
160     * Stream the log of a job.
161     *
162     * @param jobId job Id.
163     * @param writer writer to stream the log to.
164     * @param params additional parameters from the request
165     * @throws IOException thrown if the log cannot be streamed.
166     * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for
167     *         jobId.
168     */
169    public abstract void streamLog(String jobId, Writer writer, Map<String, String[]> params)
170            throws IOException, BaseEngineException;
171
172    /**
173     * Return the workflow Job ID for an external ID.
174     * <p/>
175     * This is reverse lookup for recovery purposes.
176     *
177     * @param externalId external ID provided at job submission time.
178     * @return the associated workflow job ID if any, <code>null</code> if none.
179     * @throws BaseEngineException thrown if the lookup could not be done.
180     */
181    public abstract String getJobIdForExternalId(String externalId) throws BaseEngineException;
182
183    /**
184     * Dry run a job; like {@link BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean) but doesn't actually execute
185     * the job.
186     * <p/>
187     * It validates configuration properties.
188     *
189     * @param conf job configuration.
190     * @return the result of the dryrun
191     * @throws BaseEngineException thrown if there was a problem doing the dryrun
192     */
193    public abstract String dryRunSubmit(Configuration conf) throws BaseEngineException;
194
195
196    /**
197     * Return the jms topic name for the job.
198     *
199     * @param jobId job Id.
200     * @return String the topic name
201     * @throws DagEngineException thrown if the jms info could not be obtained.
202     */
203    public String getJMSTopicName(String jobId) throws DagEngineException {
204        JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
205        if (jmsTopicService != null) {
206            try {
207                return jmsTopicService.getTopic(jobId);
208            }
209            catch (JPAExecutorException e) {
210               throw new DagEngineException(ErrorCode.E1602, e);
211            }
212        }
213        else {
214            throw new DagEngineException(ErrorCode.E1602,
215                    "JMSTopicService is not initialized. JMS notification"
216                            + "may not be enabled");
217        }
218    }
219
220}