This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie;
019
020 import java.io.IOException;
021 import java.io.Writer;
022
023 import org.apache.hadoop.conf.Configuration;
024 import org.apache.oozie.client.CoordinatorJob;
025 import org.apache.oozie.client.WorkflowJob;
026 import org.apache.oozie.executor.jpa.JPAExecutorException;
027 import org.apache.oozie.service.JMSTopicService;
028 import org.apache.oozie.service.Services;
029
030 public abstract class BaseEngine {
031 public static final String USE_XCOMMAND = "oozie.useXCommand";
032
033 protected String user;
034
035 /**
036 * Return the user name.
037 *
038 * @return the user name.
039 */
040 public String getUser() {
041 return user;
042 }
043
044 /**
045 * Submit a job.
046 * <p/>
047 * It validates configuration properties.
048 *
049 * @param conf job configuration.
050 * @param startJob indicates if the job should be started or not.
051 * @return the job Id.
052 * @throws BaseEngineException thrown if the job could not be created.
053 */
054 public abstract String submitJob(Configuration conf, boolean startJob) throws BaseEngineException;
055
056 /**
057 * Start a job.
058 *
059 * @param jobId job Id.
060 * @throws BaseEngineException thrown if the job could not be started.
061 */
062 public abstract void start(String jobId) throws BaseEngineException;
063
064 /**
065 * Resume a job.
066 *
067 * @param jobId job Id.
068 * @throws BaseEngineException thrown if the job could not be resumed.
069 */
070 public abstract void resume(String jobId) throws BaseEngineException;
071
072 /**
073 * Suspend a job.
074 *
075 * @param jobId job Id.
076 * @throws BaseEngineException thrown if the job could not be suspended.
077 */
078 public abstract void suspend(String jobId) throws BaseEngineException;
079
080 /**
081 * Kill a job.
082 *
083 * @param jobId job Id.
084 * @throws BaseEngineException thrown if the job could not be killed.
085 */
086 public abstract void kill(String jobId) throws BaseEngineException;
087
088 /**
089 * Change a coordinator job.
090 *
091 * @param jobId job Id.
092 * @param changeValue change value.
093 * @throws BaseEngineException thrown if the job could not be changed.
094 */
095 public abstract void change(String jobId, String changeValue) throws BaseEngineException;
096
097 /**
098 * Rerun a job.
099 *
100 * @param jobId job Id to rerun.
101 * @param conf configuration information for the rerun.
102 * @throws BaseEngineException thrown if the job could not be rerun.
103 */
104 public abstract void reRun(String jobId, Configuration conf) throws BaseEngineException;
105
106 /**
107 * Return the info about a wf job.
108 *
109 * @param jobId job Id.
110 * @return the workflow job info.
111 * @throws DagEngineException thrown if the job info could not be obtained.
112 */
113 public abstract WorkflowJob getJob(String jobId) throws BaseEngineException;
114
115 /**
116 * Return the info about a wf job with actions subset.
117 *
118 * @param jobId job Id
119 * @param start starting from this index in the list of actions belonging to the job
120 * @param length number of actions to be returned
121 * @return the workflow job info.
122 * @throws DagEngineException thrown if the job info could not be obtained.
123 */
124 public abstract WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException;
125
126 /**
127 * Return the info about a coord job.
128 *
129 * @param jobId job Id.
130 * @return the coord job info.
131 * @throws BaseEngineException thrown if the job info could not be obtained.
132 */
133 public abstract CoordinatorJob getCoordJob(String jobId) throws BaseEngineException;
134
135 /**
136 * Return the info about a coord job with actions subset.
137 *
138 * @param jobId job Id.
139 * @param filter the status filter
140 * @param start starting from this index in the list of actions belonging to the job
141 * @param length number of actions to be returned
142 * @param order true if actions are sorted in a descending order of nominal time, false if asc order
143 * @return the coord job info.
144 * @throws BaseEngineException thrown if the job info could not be obtained.
145 */
146 public abstract CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc)
147 throws BaseEngineException;
148
149 /**
150 * Return the a job definition.
151 *
152 * @param jobId job Id.
153 * @return the job definition.
154 * @throws BaseEngineException thrown if the job definition could no be obtained.
155 */
156 public abstract String getDefinition(String jobId) throws BaseEngineException;
157
158 /**
159 * Stream the log of a job.
160 *
161 * @param jobId job Id.
162 * @param writer writer to stream the log to.
163 * @throws IOException thrown if the log cannot be streamed.
164 * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for
165 * jobId.
166 */
167 public abstract void streamLog(String jobId, Writer writer) throws IOException, BaseEngineException;
168
169 /**
170 * Return the workflow Job ID for an external ID.
171 * <p/>
172 * This is reverse lookup for recovery purposes.
173 *
174 * @param externalId external ID provided at job submission time.
175 * @return the associated workflow job ID if any, <code>null</code> if none.
176 * @throws BaseEngineException thrown if the lookup could not be done.
177 */
178 public abstract String getJobIdForExternalId(String externalId) throws BaseEngineException;
179
180 /**
181 * Dry run a job; like {@link BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean) but doesn't actually execute
182 * the job.
183 * <p/>
184 * It validates configuration properties.
185 *
186 * @param conf job configuration.
187 * @return the result of the dryrun
188 * @throws BaseEngineException thrown if there was a problem doing the dryrun
189 */
190 public abstract String dryRunSubmit(Configuration conf) throws BaseEngineException;
191
192
193 /**
194 * Return the jms topic name for the job.
195 *
196 * @param jobId job Id.
197 * @return String the topic name
198 * @throws DagEngineException thrown if the jms info could not be obtained.
199 */
200 public String getJMSTopicName(String jobId) throws DagEngineException {
201 JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
202 if (jmsTopicService != null) {
203 try {
204 return jmsTopicService.getTopic(jobId);
205 }
206 catch (JPAExecutorException e) {
207 throw new DagEngineException(ErrorCode.E1602, e);
208 }
209 }
210 else {
211 throw new DagEngineException(ErrorCode.E1602,
212 "JMSTopicService is not initialized. JMS notification"
213 + "may not be enabled");
214 }
215 }
216
217 }