This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie;
019
020 import java.io.IOException;
021 import java.io.Writer;
022 import java.util.ArrayList;
023 import java.util.Date;
024 import java.util.HashMap;
025 import java.util.HashSet;
026 import java.util.List;
027 import java.util.Map;
028 import java.util.Set;
029 import java.util.StringTokenizer;
030
031 import org.apache.hadoop.conf.Configuration;
032 import org.apache.oozie.client.CoordinatorJob;
033 import org.apache.oozie.client.Job;
034 import org.apache.oozie.client.OozieClient;
035 import org.apache.oozie.client.WorkflowJob;
036 import org.apache.oozie.command.CommandException;
037 import org.apache.oozie.command.bundle.BundleJobChangeXCommand;
038 import org.apache.oozie.command.bundle.BundleJobResumeXCommand;
039 import org.apache.oozie.command.bundle.BundleJobSuspendXCommand;
040 import org.apache.oozie.command.bundle.BundleJobXCommand;
041 import org.apache.oozie.command.bundle.BundleJobsXCommand;
042 import org.apache.oozie.command.bundle.BundleKillXCommand;
043 import org.apache.oozie.command.bundle.BundleRerunXCommand;
044 import org.apache.oozie.command.bundle.BundleStartXCommand;
045 import org.apache.oozie.command.bundle.BundleSubmitXCommand;
046 import org.apache.oozie.service.DagXLogInfoService;
047 import org.apache.oozie.service.Services;
048 import org.apache.oozie.service.XLogService;
049 import org.apache.oozie.util.ParamChecker;
050 import org.apache.oozie.util.XLog;
051 import org.apache.oozie.util.XLogStreamer;
052
053 public class BundleEngine extends BaseEngine {
054 /**
055 * Create a system Bundle engine, with no user and no group.
056 */
057 public BundleEngine() {
058 }
059
060 /**
061 * Create a Bundle engine to perform operations on behave of a user.
062 *
063 * @param user user name.
064 * @param authToken the authentication token.
065 */
066 public BundleEngine(String user, String authToken) {
067 this.user = ParamChecker.notEmpty(user, "user");
068 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
069 }
070
071 /* (non-Javadoc)
072 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
073 */
074 @Override
075 public void change(String jobId, String changeValue) throws BundleEngineException {
076 try {
077 BundleJobChangeXCommand change = new BundleJobChangeXCommand(jobId, changeValue);
078 change.call();
079 }
080 catch (CommandException ex) {
081 throw new BundleEngineException(ex);
082 }
083 }
084
085 /* (non-Javadoc)
086 * @see org.apache.oozie.BaseEngine#dryrunSubmit(org.apache.hadoop.conf.Configuration, boolean)
087 */
088 @Override
089 public String dryrunSubmit(Configuration conf, boolean startJob) throws BundleEngineException {
090 BundleSubmitXCommand submit = new BundleSubmitXCommand(true, conf, getAuthToken());
091 try {
092 String jobId = submit.call();
093 return jobId;
094 }
095 catch (CommandException ex) {
096 throw new BundleEngineException(ex);
097 }
098 }
099
100 /* (non-Javadoc)
101 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
102 */
103 @Override
104 public CoordinatorJob getCoordJob(String jobId) throws BundleEngineException {
105 throw new BundleEngineException(new XException(ErrorCode.E0301));
106 }
107
108 public BundleJobBean getBundleJob(String jobId) throws BundleEngineException {
109 try {
110 return new BundleJobXCommand(jobId).call();
111 }
112 catch (CommandException ex) {
113 throw new BundleEngineException(ex);
114 }
115 }
116
117 /* (non-Javadoc)
118 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int)
119 */
120 @Override
121 public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length) throws BundleEngineException {
122 throw new BundleEngineException(new XException(ErrorCode.E0301));
123 }
124
125 /* (non-Javadoc)
126 * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
127 */
128 @Override
129 public String getDefinition(String jobId) throws BundleEngineException {
130 BundleJobBean job;
131 try {
132 job = new BundleJobXCommand(jobId).call();
133 }
134 catch (CommandException ex) {
135 throw new BundleEngineException(ex);
136 }
137 return job.getOrigJobXml();
138 }
139
140 /* (non-Javadoc)
141 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
142 */
143 @Override
144 public WorkflowJob getJob(String jobId) throws BundleEngineException {
145 throw new BundleEngineException(new XException(ErrorCode.E0301));
146 }
147
148 /* (non-Javadoc)
149 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
150 */
151 @Override
152 public WorkflowJob getJob(String jobId, int start, int length) throws BundleEngineException {
153 throw new BundleEngineException(new XException(ErrorCode.E0301));
154 }
155
156 /* (non-Javadoc)
157 * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
158 */
159 @Override
160 public String getJobIdForExternalId(String externalId) throws BundleEngineException {
161 return null;
162 }
163
164 /* (non-Javadoc)
165 * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
166 */
167 @Override
168 public void kill(String jobId) throws BundleEngineException {
169 try {
170 new BundleKillXCommand(jobId).call();
171 }
172 catch (CommandException e) {
173 throw new BundleEngineException(e);
174 }
175 }
176
177 /* (non-Javadoc)
178 * @see org.apache.oozie.BaseEngine#reRun(java.lang.String, org.apache.hadoop.conf.Configuration)
179 */
180 @Override
181 @Deprecated
182 public void reRun(String jobId, Configuration conf) throws BundleEngineException {
183 throw new BundleEngineException(new XException(ErrorCode.E0301));
184 }
185
186 /**
187 * Rerun Bundle actions for given rerunType
188 *
189 * @param jobId bundle job id
190 * @param coordScope the rerun scope for coordinator job names separated by ","
191 * @param dateScope the rerun scope for coordinator nominal times separated by ","
192 * @param refresh true if user wants to refresh input/outpur dataset urls
193 * @param noCleanup false if user wants to cleanup output events for given rerun actions
194 * @throws BaseEngineException thrown if failed to rerun
195 */
196 public void reRun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup)
197 throws BaseEngineException {
198 try {
199 new BundleRerunXCommand(jobId, coordScope, dateScope, refresh, noCleanup).call();
200 }
201 catch (CommandException ex) {
202 throw new BaseEngineException(ex);
203 }
204 }
205
206 /* (non-Javadoc)
207 * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
208 */
209 @Override
210 public void resume(String jobId) throws BundleEngineException {
211 BundleJobResumeXCommand resume = new BundleJobResumeXCommand(jobId);
212 try {
213 resume.call();
214 }
215 catch (CommandException ex) {
216 throw new BundleEngineException(ex);
217 }
218 }
219
220 /* (non-Javadoc)
221 * @see org.apache.oozie.BaseEngine#start(java.lang.String)
222 */
223 @Override
224 public void start(String jobId) throws BundleEngineException {
225 try {
226 new BundleStartXCommand(jobId).call();
227 }
228 catch (CommandException e) {
229 throw new BundleEngineException(e);
230 }
231 }
232
233 /* (non-Javadoc)
234 * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String, java.io.Writer)
235 */
236 @Override
237 public void streamLog(String jobId, Writer writer) throws IOException, BundleEngineException {
238 XLogStreamer.Filter filter = new XLogStreamer.Filter();
239 filter.setParameter(DagXLogInfoService.JOB, jobId);
240
241 BundleJobBean job;
242 try {
243 job = new BundleJobXCommand(jobId).call();
244 }
245 catch (CommandException ex) {
246 throw new BundleEngineException(ex);
247 }
248
249 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
250 }
251
252 /* (non-Javadoc)
253 * @see org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean)
254 */
255 @Override
256 public String submitJob(Configuration conf, boolean startJob) throws BundleEngineException {
257 try {
258 String jobId = new BundleSubmitXCommand(conf, getAuthToken()).call();
259
260 if (startJob) {
261 start(jobId);
262 }
263 return jobId;
264 }
265 catch (CommandException ex) {
266 throw new BundleEngineException(ex);
267 }
268 }
269
270 /* (non-Javadoc)
271 * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
272 */
273 @Override
274 public void suspend(String jobId) throws BundleEngineException {
275 BundleJobSuspendXCommand suspend = new BundleJobSuspendXCommand(jobId);
276 try {
277 suspend.call();
278 }
279 catch (CommandException ex) {
280 throw new BundleEngineException(ex);
281 }
282 }
283
284 private static final Set<String> FILTER_NAMES = new HashSet<String>();
285
286 static {
287 FILTER_NAMES.add(OozieClient.FILTER_USER);
288 FILTER_NAMES.add(OozieClient.FILTER_NAME);
289 FILTER_NAMES.add(OozieClient.FILTER_GROUP);
290 FILTER_NAMES.add(OozieClient.FILTER_STATUS);
291 FILTER_NAMES.add(OozieClient.FILTER_ID);
292 }
293
294 /**
295 * Get bundle jobs
296 *
297 * @param filter the filter string
298 * @param start start location for paging
299 * @param len total length to get
300 * @return bundle job info
301 * @throws BundleEngineException thrown if failed to get bundle job info
302 */
303 public BundleJobInfo getBundleJobs(String filter, int start, int len) throws BundleEngineException {
304 Map<String, List<String>> filterList = parseFilter(filter);
305
306 try {
307 return new BundleJobsXCommand(filterList, start, len).call();
308 }
309 catch (CommandException ex) {
310 throw new BundleEngineException(ex);
311 }
312 }
313
314 /**
315 * Parse filter string to a map with key = filter name and values = filter values
316 *
317 * @param filter the filter string
318 * @return filter key and value map
319 * @throws CoordinatorEngineException thrown if failed to parse filter string
320 */
321 private Map<String, List<String>> parseFilter(String filter) throws BundleEngineException {
322 Map<String, List<String>> map = new HashMap<String, List<String>>();
323 if (filter != null) {
324 StringTokenizer st = new StringTokenizer(filter, ";");
325 while (st.hasMoreTokens()) {
326 String token = st.nextToken();
327 if (token.contains("=")) {
328 String[] pair = token.split("=");
329 if (pair.length != 2) {
330 throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
331 }
332 if (!FILTER_NAMES.contains(pair[0])) {
333 throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]",
334 pair[0]));
335 }
336 if (pair[0].equals("status")) {
337 try {
338 Job.Status.valueOf(pair[1]);
339 }
340 catch (IllegalArgumentException ex) {
341 throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format(
342 "invalid status [{0}]", pair[1]));
343 }
344 }
345 List<String> list = map.get(pair[0]);
346 if (list == null) {
347 list = new ArrayList<String>();
348 map.put(pair[0], list);
349 }
350 list.add(pair[1]);
351 }
352 else {
353 throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
354 }
355 }
356 }
357 return map;
358 }
359
360 }