001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie;
020
021import java.io.IOException;
022import java.io.Writer;
023import java.util.Date;
024import java.util.Map;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.oozie.client.CoordinatorJob;
028import org.apache.oozie.client.WorkflowJob;
029import org.apache.oozie.executor.jpa.JPAExecutorException;
030import org.apache.oozie.service.JMSTopicService;
031import org.apache.oozie.service.Services;
032import org.apache.oozie.service.XLogStreamingService;
033import org.apache.oozie.util.XLogFilter;
034
035public abstract class BaseEngine {
036    public static final String USE_XCOMMAND = "oozie.useXCommand";
037
038    public enum LOG_TYPE {
039        LOG, ERROR_LOG, AUDIT_LOG
040    }
041
042    protected String user;
043
044    /**
045     * Return the user name.
046     *
047     * @return the user name.
048     */
049    public String getUser() {
050        return user;
051    }
052
053    /**
054     * Submit a job.
055     * <p>
056     * It validates configuration properties.
057     *
058     * @param conf job configuration.
059     * @param startJob indicates if the job should be started or not.
060     * @return the job Id.
061     * @throws BaseEngineException thrown if the job could not be created.
062     */
063    public abstract String submitJob(Configuration conf, boolean startJob) throws BaseEngineException;
064
065    /**
066     * Start a job.
067     *
068     * @param jobId job Id.
069     * @throws BaseEngineException thrown if the job could not be started.
070     */
071    public abstract void start(String jobId) throws BaseEngineException;
072
073    /**
074     * Resume a job.
075     *
076     * @param jobId job Id.
077     * @throws BaseEngineException thrown if the job could not be resumed.
078     */
079    public abstract void resume(String jobId) throws BaseEngineException;
080
081    /**
082     * Suspend a job.
083     *
084     * @param jobId job Id.
085     * @throws BaseEngineException thrown if the job could not be suspended.
086     */
087    public abstract void suspend(String jobId) throws BaseEngineException;
088
089    /**
090     * Kill a job.
091     *
092     * @param jobId job Id.
093     * @throws BaseEngineException thrown if the job could not be killed.
094     */
095    public abstract void kill(String jobId) throws BaseEngineException;
096
097    /**
098     * Change a coordinator job.
099     *
100     * @param jobId job Id.
101     * @param changeValue change value.
102     * @throws BaseEngineException thrown if the job could not be changed.
103     */
104    public abstract void change(String jobId, String changeValue) throws BaseEngineException;
105
106    /**
107     * Rerun a job.
108     *
109     * @param jobId job Id to rerun.
110     * @param conf configuration information for the rerun.
111     * @throws BaseEngineException thrown if the job could not be rerun.
112     */
113    public abstract void reRun(String jobId, Configuration conf) throws BaseEngineException;
114
115    /**
116     * Return the info about a wf job.
117     *
118     * @param jobId job Id.
119     * @return the workflow job info.
120     * @throws DagEngineException thrown if the job info could not be obtained.
121     */
122    public abstract WorkflowJob getJob(String jobId) throws BaseEngineException;
123
124    /**
125     * Return the info about a wf job with actions subset.
126     *
127     * @param jobId job Id
128     * @param start starting from this index in the list of actions belonging to the job
129     * @param length number of actions to be returned
130     * @return the workflow job info.
131     * @throws DagEngineException thrown if the job info could not be obtained.
132     */
133    public abstract WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException;
134
135    /**
136     * Return the info about a coord job.
137     *
138     * @param jobId job Id.
139     * @return the coord job info.
140     * @throws BaseEngineException thrown if the job info could not be obtained.
141     */
142    public abstract CoordinatorJob getCoordJob(String jobId) throws BaseEngineException;
143
144    /**
145     * Return the info about a coord job with actions subset.
146     *
147     * @param jobId job Id.
148     * @param filter the status filter
149     * @param start starting from this index in the list of actions belonging to the job
150     * @param length number of actions to be returned
151     * @param desc true if actions are sorted in a descending order of nominal time, false if asc order
152     * @return the coord job info.
153     * @throws BaseEngineException thrown if the job info could not be obtained.
154     */
155    public abstract CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc)
156            throws BaseEngineException;
157
158    /**
159     * Return the a job definition.
160     *
161     * @param jobId job Id.
162     * @return the job definition.
163     * @throws BaseEngineException thrown if the job definition could no be obtained.
164     */
165    public abstract String getDefinition(String jobId) throws BaseEngineException;
166
167    /**
168     * Stream the log of a job.
169     *
170     * @param jobId job Id.
171     * @param writer writer to stream the log to.
172     * @param params additional parameters from the request
173     * @throws IOException thrown if the log cannot be streamed.
174     * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for
175     *         jobId.
176     */
177    public abstract void streamLog(String jobId, Writer writer, Map<String, String[]> params)
178            throws IOException, BaseEngineException;
179
180    /**
181     * Stream error log of a job.
182     *
183     * @param jobId job Id.
184     * @param writer writer to stream the log to.
185     * @param params additional parameters from the request
186     * @throws IOException thrown if the log cannot be streamed.
187     * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for
188     *         jobId.
189     */
190    public abstract void streamErrorLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
191            BaseEngineException;
192    /**
193     * Stream Audit log of a job.
194     *
195     * @param jobId job Id.
196     * @param writer writer to stream the log to.
197     * @param params additional parameters from the request
198     * @throws IOException thrown if the log cannot be streamed.
199     * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for
200     *         jobId.
201     */
202    public abstract void streamAuditLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
203            BaseEngineException;
204
205
206    /**
207     * Return the workflow Job ID for an external ID.
208     * <p>
209     * This is reverse lookup for recovery purposes.
210     *
211     * @param externalId external ID provided at job submission time.
212     * @return the associated workflow job ID if any, <code>null</code> if none.
213     * @throws BaseEngineException thrown if the lookup could not be done.
214     */
215    public abstract String getJobIdForExternalId(String externalId) throws BaseEngineException;
216
217    /**
218     * Dry run a job; like {@link BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean)} but doesn't actually execute
219     * the job.
220     * <p>
221     * It validates configuration properties.
222     *
223     * @param conf job configuration.
224     * @return the result of the dryrun
225     * @throws BaseEngineException thrown if there was a problem doing the dryrun
226     */
227    public abstract String dryRunSubmit(Configuration conf) throws BaseEngineException;
228
229
230    /**
231     * Return the jms topic name for the job.
232     *
233     * @param jobId job Id.
234     * @return String the topic name
235     * @throws DagEngineException thrown if the jms info could not be obtained.
236     */
237    public String getJMSTopicName(String jobId) throws DagEngineException {
238        JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
239        if (jmsTopicService != null) {
240            try {
241                return jmsTopicService.getTopic(jobId);
242            }
243            catch (JPAExecutorException e) {
244               throw new DagEngineException(ErrorCode.E1602, e);
245            }
246        }
247        else {
248            throw new DagEngineException(ErrorCode.E1602,
249                    "JMSTopicService is not initialized. JMS notification"
250                            + "may not be enabled");
251        }
252    }
253
254    /**
255     * Return the status for a Job ID
256     *
257     * @param jobId job Id.
258     * @return the job's status
259     * @throws BaseEngineException thrown if the job's status could not be obtained
260     */
261    public abstract String getJobStatus(String jobId) throws BaseEngineException;
262
263    /**
264     * Return the status for a Job ID
265     *
266     * @param jobId job Id.
267     * @return the job's status
268     * @throws BaseEngineException thrown if the job's status could not be obtained
269     */
270
271    /**
272     * Enable SLA alert for job
273     * @param id
274     * @param actions
275     * @param dates
276     * @param childIds
277     * @throws BaseEngineException
278     */
279    public abstract void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException;
280
281    /**
282     * Disable SLA alert for job
283     * @param id
284     * @param actions
285     * @param dates
286     * @param childIds
287     * @throws BaseEngineException
288     */
289    public abstract void disableSLAAlert(String id, String actions, String  dates, String childIds) throws BaseEngineException;
290
291    /**
292     * Change SLA properties for job
293     * @param id
294     * @param actions
295     * @param childIds
296     * @param newParams
297     * @throws BaseEngineException
298     */
299    public abstract void changeSLA(String id, String actions, String  dates, String childIds, String newParams)
300            throws BaseEngineException;
301
302    protected void fetchLog(XLogFilter filter, Date startTime, Date endTime, Writer writer,
303            Map<String, String[]> params, LOG_TYPE logType) throws IOException {
304
305        switch (logType) {
306            case LOG:
307                Services.get().get(XLogStreamingService.class).streamLog(filter, startTime, endTime, writer, params);
308                break;
309            case ERROR_LOG:
310                Services.get().get(XLogStreamingService.class)
311                        .streamErrorLog(filter, startTime, endTime, writer, params);
312                break;
313            case AUDIT_LOG:
314                Services.get().get(XLogStreamingService.class)
315                        .streamAuditLog(filter, startTime, endTime, writer, params);
316                break;
317            default:
318                throw new IOException("Unsupported log Type");
319        }
320    }
321
322
323}