This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie;
019
020 import java.io.IOException;
021 import java.io.Writer;
022 import java.util.ArrayList;
023 import java.util.HashMap;
024 import java.util.HashSet;
025 import java.util.List;
026 import java.util.Map;
027 import java.util.Properties;
028 import java.util.Set;
029 import java.util.StringTokenizer;
030
031 import org.apache.hadoop.conf.Configuration;
032 import org.apache.oozie.client.CoordinatorJob;
033 import org.apache.oozie.client.OozieClient;
034 import org.apache.oozie.client.WorkflowJob;
035 import org.apache.oozie.command.CommandException;
036 import org.apache.oozie.command.wf.CompletedActionCommand;
037 import org.apache.oozie.command.wf.DefinitionCommand;
038 import org.apache.oozie.command.wf.ExternalIdCommand;
039 import org.apache.oozie.command.wf.JobCommand;
040 import org.apache.oozie.command.wf.JobsCommand;
041 import org.apache.oozie.command.wf.KillCommand;
042 import org.apache.oozie.command.wf.ReRunCommand;
043 import org.apache.oozie.command.wf.ResumeCommand;
044 import org.apache.oozie.command.wf.StartCommand;
045 import org.apache.oozie.command.wf.SubmitCommand;
046 import org.apache.oozie.command.wf.SuspendCommand;
047 import org.apache.oozie.service.DagXLogInfoService;
048 import org.apache.oozie.service.Service;
049 import org.apache.oozie.service.Services;
050 import org.apache.oozie.service.XLogService;
051 import org.apache.oozie.util.ParamChecker;
052 import org.apache.oozie.util.XLog;
053 import org.apache.oozie.util.XLogStreamer;
054
055 public abstract class BaseEngine {
056 public static final String USE_XCOMMAND = "oozie.useXCommand";
057
058 protected String user;
059 protected String authToken;
060
061 /**
062 * Return the user name.
063 *
064 * @return the user name.
065 */
066 public String getUser() {
067 return user;
068 }
069
070 /**
071 * Return the authentication token.
072 *
073 * @return the authentication token.
074 */
075 protected String getAuthToken() {
076 return authToken;
077 }
078
079 /**
080 * Submit a job.
081 * <p/>
082 * It validates configuration properties.
083 *
084 * @param conf job configuration.
085 * @param startJob indicates if the job should be started or not.
086 * @return the job Id.
087 * @throws BaseEngineException thrown if the job could not be created.
088 */
089 public abstract String submitJob(Configuration conf, boolean startJob) throws BaseEngineException;
090
091 /**
092 * Start a job.
093 *
094 * @param jobId job Id.
095 * @throws BaseEngineException thrown if the job could not be started.
096 */
097 public abstract void start(String jobId) throws BaseEngineException;
098
099 /**
100 * Resume a job.
101 *
102 * @param jobId job Id.
103 * @throws BaseEngineException thrown if the job could not be resumed.
104 */
105 public abstract void resume(String jobId) throws BaseEngineException;
106
107 /**
108 * Suspend a job.
109 *
110 * @param jobId job Id.
111 * @throws BaseEngineException thrown if the job could not be suspended.
112 */
113 public abstract void suspend(String jobId) throws BaseEngineException;
114
115 /**
116 * Kill a job.
117 *
118 * @param jobId job Id.
119 * @throws BaseEngineException thrown if the job could not be killed.
120 */
121 public abstract void kill(String jobId) throws BaseEngineException;
122
123 /**
124 * Change a coordinator job.
125 *
126 * @param jobId job Id.
127 * @param changeValue change value.
128 * @throws BaseEngineException thrown if the job could not be changed.
129 */
130 public abstract void change(String jobId, String changeValue) throws BaseEngineException;
131
132 /**
133 * Rerun a job.
134 *
135 * @param jobId job Id to rerun.
136 * @param conf configuration information for the rerun.
137 * @throws BaseEngineException thrown if the job could not be rerun.
138 */
139 public abstract void reRun(String jobId, Configuration conf) throws BaseEngineException;
140
141 /**
142 * Return the info about a wf job.
143 *
144 * @param jobId job Id.
145 * @return the workflow job info.
146 * @throws DagEngineException thrown if the job info could not be obtained.
147 */
148 public abstract WorkflowJob getJob(String jobId) throws BaseEngineException;
149
150 /**
151 * Return the info about a wf job with actions subset.
152 *
153 * @param jobId job Id
154 * @param start starting from this index in the list of actions belonging to the job
155 * @param length number of actions to be returned
156 * @return the workflow job info.
157 * @throws DagEngineException thrown if the job info could not be obtained.
158 */
159 public abstract WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException;
160
161 /**
162 * Return the info about a coord job.
163 *
164 * @param jobId job Id.
165 * @return the coord job info.
166 * @throws BaseEngineException thrown if the job info could not be obtained.
167 */
168 public abstract CoordinatorJob getCoordJob(String jobId) throws BaseEngineException;
169
170 /**
171 * Return the info about a coord job with actions subset.
172 *
173 * @param jobId job Id.
174 * @param start starting from this index in the list of actions belonging to the job
175 * @param length number of actions to be returned
176 * @return the coord job info.
177 * @throws BaseEngineException thrown if the job info could not be obtained.
178 */
179 public abstract CoordinatorJob getCoordJob(String jobId, int start, int length) throws BaseEngineException;
180
181 /**
182 * Return the a job definition.
183 *
184 * @param jobId job Id.
185 * @return the job definition.
186 * @throws BaseEngineException thrown if the job definition could no be obtained.
187 */
188 public abstract String getDefinition(String jobId) throws BaseEngineException;
189
190 /**
191 * Stream the log of a job.
192 *
193 * @param jobId job Id.
194 * @param writer writer to stream the log to.
195 * @throws IOException thrown if the log cannot be streamed.
196 * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for
197 * jobId.
198 */
199 public abstract void streamLog(String jobId, Writer writer) throws IOException, BaseEngineException;
200
201 /**
202 * Return the workflow Job ID for an external ID.
203 * <p/>
204 * This is reverse lookup for recovery purposes.
205 *
206 * @param externalId external ID provided at job submission time.
207 * @return the associated workflow job ID if any, <code>null</code> if none.
208 * @throws BaseEngineException thrown if the lookup could not be done.
209 */
210 public abstract String getJobIdForExternalId(String externalId) throws BaseEngineException;
211
212 public abstract String dryrunSubmit(Configuration conf, boolean startJob) throws BaseEngineException;
213
214 }