001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie;
020
021import java.io.IOException;
022import java.io.Writer;
023import java.util.Map;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.oozie.client.CoordinatorJob;
027import org.apache.oozie.client.OozieClientException;
028import org.apache.oozie.client.WorkflowJob;
029import org.apache.oozie.command.CommandException;
030import org.apache.oozie.executor.jpa.JPAExecutorException;
031import org.apache.oozie.service.JMSTopicService;
032import org.apache.oozie.service.Services;
033import org.apache.oozie.util.XLogAuditStreamer;
034import org.apache.oozie.util.XLogErrorStreamer;
035import org.apache.oozie.util.XLogStreamer;
036
037public abstract class BaseEngine {
038    public static final String USE_XCOMMAND = "oozie.useXCommand";
039
040    protected String user;
041
042    /**
043     * Return the user name.
044     *
045     * @return the user name.
046     */
047    public String getUser() {
048        return user;
049    }
050
051    /**
052     * Submit a job.
053     * <p>
054     * It validates configuration properties.
055     *
056     * @param conf job configuration.
057     * @param startJob indicates if the job should be started or not.
058     * @return the job Id.
059     * @throws BaseEngineException thrown if the job could not be created.
060     */
061    public abstract String submitJob(Configuration conf, boolean startJob) throws BaseEngineException;
062
063    /**
064     * Start a job.
065     *
066     * @param jobId job Id.
067     * @throws BaseEngineException thrown if the job could not be started.
068     */
069    public abstract void start(String jobId) throws BaseEngineException;
070
071    /**
072     * Resume a job.
073     *
074     * @param jobId job Id.
075     * @throws BaseEngineException thrown if the job could not be resumed.
076     */
077    public abstract void resume(String jobId) throws BaseEngineException;
078
079    /**
080     * Suspend a job.
081     *
082     * @param jobId job Id.
083     * @throws BaseEngineException thrown if the job could not be suspended.
084     */
085    public abstract void suspend(String jobId) throws BaseEngineException;
086
087    /**
088     * Kill a job.
089     *
090     * @param jobId job Id.
091     * @throws BaseEngineException thrown if the job could not be killed.
092     */
093    public abstract void kill(String jobId) throws BaseEngineException;
094
095    /**
096     * Change a coordinator job.
097     *
098     * @param jobId job Id.
099     * @param changeValue change value.
100     * @throws BaseEngineException thrown if the job could not be changed.
101     */
102    public abstract void change(String jobId, String changeValue) throws BaseEngineException;
103
104    /**
105     * Rerun a job.
106     *
107     * @param jobId job Id to rerun.
108     * @param conf configuration information for the rerun.
109     * @throws BaseEngineException thrown if the job could not be rerun.
110     */
111    public abstract void reRun(String jobId, Configuration conf) throws BaseEngineException;
112
113    /**
114     * Return the info about a wf job.
115     *
116     * @param jobId job Id.
117     * @return the workflow job info.
118     * @throws DagEngineException thrown if the job info could not be obtained.
119     */
120    public abstract WorkflowJob getJob(String jobId) throws BaseEngineException;
121
122    /**
123     * Return the info about a wf job with actions subset.
124     *
125     * @param jobId job Id
126     * @param start starting from this index in the list of actions belonging to the job
127     * @param length number of actions to be returned
128     * @return the workflow job info.
129     * @throws DagEngineException thrown if the job info could not be obtained.
130     */
131    public abstract WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException;
132
133    /**
134     * Return the info about a coord job.
135     *
136     * @param jobId job Id.
137     * @return the coord job info.
138     * @throws BaseEngineException thrown if the job info could not be obtained.
139     */
140    public abstract CoordinatorJob getCoordJob(String jobId) throws BaseEngineException;
141
142    /**
143     * Return the info about a coord job with actions subset.
144     *
145     * @param jobId job Id.
146     * @param filter the status filter
147     * @param start starting from this index in the list of actions belonging to the job
148     * @param length number of actions to be returned
149     * @param desc true if actions are sorted in a descending order of nominal time, false if asc order
150     * @return the coord job info.
151     * @throws BaseEngineException thrown if the job info could not be obtained.
152     */
153    public abstract CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc)
154            throws BaseEngineException;
155
156    /**
157     * Return the a job definition.
158     *
159     * @param jobId job Id.
160     * @return the job definition.
161     * @throws BaseEngineException thrown if the job definition could no be obtained.
162     */
163    public abstract String getDefinition(String jobId) throws BaseEngineException;
164
165    /**
166     * Stream the log of a job.
167     *
168     * @param jobId job Id.
169     * @param writer writer to stream the log to.
170     * @param requestParameters additional parameters from the request
171     * @throws IOException thrown if the log cannot be streamed.
172     * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for
173     *         jobId.
174     */
175    public void streamLog(String jobId, Writer writer, Map<String, String[]> requestParameters) throws IOException,
176            BaseEngineException {
177        try {
178
179            streamJobLog(new XLogStreamer(requestParameters), jobId, writer);
180        }
181        catch (CommandException e) {
182            throw new IOException(e);
183        }
184    }
185
186    /**
187     * Stream error log of a job.
188     *
189     * @param jobId job Id.
190     * @param writer writer to stream the log to.
191     * @param requestParameters additional parameters from the request
192     * @throws IOException thrown if the log cannot be streamed.
193     * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for
194     *         jobId.
195     */
196    public void streamErrorLog(String jobId, Writer writer, Map<String, String[]> requestParameters) throws IOException,
197            BaseEngineException {
198        try {
199
200            streamJobLog(new XLogErrorStreamer(requestParameters), jobId, writer);
201        }
202        catch (CommandException e) {
203            throw new IOException(e);
204        }
205    }    /**
206     * Stream Audit log of a job.
207     *
208     * @param jobId job Id.
209     * @param writer writer to stream the log to.
210     * @param requestParameters additional parameters from the request
211     * @throws IOException thrown if the log cannot be streamed.
212     * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for
213     *         jobId.
214     */
215    public void streamAuditLog(String jobId, Writer writer, Map<String, String[]> requestParameters) throws IOException,
216            BaseEngineException {
217        try {
218            streamJobLog(new XLogAuditStreamer(requestParameters), jobId, writer);
219        }
220        catch (CommandException e) {
221            throw new IOException(e);
222        }
223    }
224
225    /**
226     * Return the workflow Job ID for an external ID.
227     * <p>
228     * This is reverse lookup for recovery purposes.
229     *
230     * @param externalId external ID provided at job submission time.
231     * @return the associated workflow job ID if any, <code>null</code> if none.
232     * @throws BaseEngineException thrown if the lookup could not be done.
233     */
234    public abstract String getJobIdForExternalId(String externalId) throws BaseEngineException;
235
236    /**
237     * Dry run a job; like {@link BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean)} but doesn't actually execute
238     * the job.
239     * <p>
240     * It validates configuration properties.
241     *
242     * @param conf job configuration.
243     * @return the result of the dryrun
244     * @throws BaseEngineException thrown if there was a problem doing the dryrun
245     */
246    public abstract String dryRunSubmit(Configuration conf) throws BaseEngineException;
247
248
249    /**
250     * Return the jms topic name for the job.
251     *
252     * @param jobId job Id.
253     * @return String the topic name
254     * @throws DagEngineException thrown if the jms info could not be obtained.
255     */
256    public String getJMSTopicName(String jobId) throws DagEngineException {
257        JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
258        if (jmsTopicService != null) {
259            try {
260                return jmsTopicService.getTopic(jobId);
261            }
262            catch (JPAExecutorException e) {
263               throw new DagEngineException(ErrorCode.E1602, e);
264            }
265        }
266        else {
267            throw new DagEngineException(ErrorCode.E1602,
268                    "JMSTopicService is not initialized. JMS notification"
269                            + "may not be enabled");
270        }
271    }
272
273    /**
274     * Return the status for a Job ID
275     *
276     * @param jobId job Id.
277     * @return the job's status
278     * @throws BaseEngineException thrown if the job's status could not be obtained
279     */
280    public abstract String getJobStatus(String jobId) throws BaseEngineException;
281
282    /**
283     * Enable SLA alert for job
284     * @param id job ID
285     * @param actions list of actions
286     * @param dates dates
287     * @param childIds child IDs
288     * @throws BaseEngineException thrown if SLA alert could not be enabled
289     */
290    public abstract void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException;
291
292    /**
293     * Disable SLA alert for job
294     * @param id job ID
295     * @param actions list of actions
296     * @param dates dates
297     * @param childIds child IDs
298     * @throws BaseEngineException thrown if SLA alert could not be disabled
299     */
300    public abstract void disableSLAAlert(String id, String actions, String  dates, String childIds) throws BaseEngineException;
301
302    /**
303     * Change SLA properties for job
304     * @param id job ID
305     * @param actions list of actions
306     * @param dates dates
307     * @param childIds child IDs
308     * @param newParams parameters to
309     * @throws BaseEngineException thrown if SLA alert could not be enabled
310     */
311    public abstract void changeSLA(String id, String actions, String  dates, String childIds, String newParams)
312            throws BaseEngineException;
313
314    /**
315     * Stream job log.
316     *
317     * @param logStreamer the log streamer
318     * @param jobId the job id
319     * @param writer the writer
320     * @throws IOException Signals that an I/O exception has occurred.
321     * @throws BaseEngineException the base engine exception
322     */
323    protected abstract void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer) throws IOException,
324            BaseEngineException;
325
326    interface BaseEngineCallable<V> {
327        V callOrThrow() throws BaseEngineException;
328    }
329
330    static <V> V callOrRethrow(final BaseEngineCallable<V> callable) throws OozieClientException {
331        try {
332            return callable.callOrThrow();
333        } catch (final BaseEngineException e) {
334            throw new OozieClientException(e.getErrorCode().toString(), e);
335        }
336    }
337
338    static <V> V throwNoOp() throws OozieClientException {
339        throw new OozieClientException(ErrorCode.E0301.toString(), "no-op");
340    }
341}