This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie;
019
020 import java.io.IOException;
021 import java.io.Writer;
022 import java.util.ArrayList;
023 import java.util.Date;
024 import java.util.HashMap;
025 import java.util.HashSet;
026 import java.util.Iterator;
027 import java.util.List;
028 import java.util.Map;
029 import java.util.Set;
030 import java.util.StringTokenizer;
031 import org.apache.hadoop.conf.Configuration;
032 import org.apache.oozie.client.CoordinatorJob;
033 import org.apache.oozie.client.OozieClient;
034 import org.apache.oozie.client.WorkflowJob;
035 import org.apache.oozie.client.rest.RestConstants;
036 import org.apache.oozie.command.CommandException;
037 import org.apache.oozie.command.coord.CoordActionInfoCommand;
038 import org.apache.oozie.command.coord.CoordActionInfoXCommand;
039 import org.apache.oozie.util.CoordActionsInDateRange;
040 import org.apache.oozie.command.coord.CoordChangeCommand;
041 import org.apache.oozie.command.coord.CoordChangeXCommand;
042 import org.apache.oozie.command.coord.CoordJobCommand;
043 import org.apache.oozie.command.coord.CoordJobXCommand;
044 import org.apache.oozie.command.coord.CoordJobsCommand;
045 import org.apache.oozie.command.coord.CoordJobsXCommand;
046 import org.apache.oozie.command.coord.CoordKillCommand;
047 import org.apache.oozie.command.coord.CoordKillXCommand;
048 import org.apache.oozie.command.coord.CoordRerunCommand;
049 import org.apache.oozie.command.coord.CoordRerunXCommand;
050 import org.apache.oozie.command.coord.CoordResumeCommand;
051 import org.apache.oozie.command.coord.CoordResumeXCommand;
052 import org.apache.oozie.command.coord.CoordSubmitCommand;
053 import org.apache.oozie.command.coord.CoordSubmitXCommand;
054 import org.apache.oozie.command.coord.CoordSuspendCommand;
055 import org.apache.oozie.command.coord.CoordSuspendXCommand;
056 import org.apache.oozie.service.DagXLogInfoService;
057 import org.apache.oozie.service.Services;
058 import org.apache.oozie.service.XLogService;
059 import org.apache.oozie.util.ParamChecker;
060 import org.apache.oozie.util.XLog;
061 import org.apache.oozie.util.XLogStreamer;
062
063 public class CoordinatorEngine extends BaseEngine {
064 private static boolean useXCommand = true;
065 private static XLog LOG = XLog.getLog(CoordinatorEngine.class);
066
067 /**
068 * Create a system Coordinator engine, with no user and no group.
069 */
070 public CoordinatorEngine() {
071 if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
072 useXCommand = false;
073 LOG.debug("Oozie CoordinatorEngine is not using XCommands.");
074 }
075 else {
076 LOG.debug("Oozie CoordinatorEngine is using XCommands.");
077 }
078 }
079
080 /**
081 * Create a Coordinator engine to perform operations on behave of a user.
082 *
083 * @param user user name.
084 * @param authToken the authentication token.
085 */
086 public CoordinatorEngine(String user, String authToken) {
087 this();
088 this.user = ParamChecker.notEmpty(user, "user");
089 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
090 }
091
092 /*
093 * (non-Javadoc)
094 *
095 * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
096 */
097 @Override
098 public String getDefinition(String jobId) throws BaseEngineException {
099 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
100 return job.getOrigJobXml();
101 }
102
103 /**
104 * @param jobId
105 * @return CoordinatorJobBean
106 * @throws BaseEngineException
107 */
108 private CoordinatorJobBean getCoordJobWithNoActionInfo(String jobId) throws BaseEngineException {
109 try {
110 if (useXCommand) {
111 return new CoordJobXCommand(jobId).call();
112 }
113 else {
114 return new CoordJobCommand(jobId).call();
115 }
116 }
117 catch (CommandException ex) {
118 throw new BaseEngineException(ex);
119 }
120 }
121
122 /**
123 * @param actionId
124 * @return CoordinatorActionBean
125 * @throws BaseEngineException
126 */
127 public CoordinatorActionBean getCoordAction(String actionId) throws BaseEngineException {
128 try {
129 if (useXCommand) {
130 return new CoordActionInfoXCommand(actionId).call();
131 }
132 else {
133 return new CoordActionInfoCommand(actionId).call();
134 }
135 }
136 catch (CommandException ex) {
137 throw new BaseEngineException(ex);
138 }
139 }
140
141 /*
142 * (non-Javadoc)
143 *
144 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
145 */
146 @Override
147 public CoordinatorJobBean getCoordJob(String jobId) throws BaseEngineException {
148 try {
149 if (useXCommand) {
150 return new CoordJobXCommand(jobId).call();
151 }
152 else {
153 return new CoordJobCommand(jobId).call();
154 }
155 }
156 catch (CommandException ex) {
157 throw new BaseEngineException(ex);
158 }
159 }
160
161 /*
162 * (non-Javadoc)
163 *
164 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int)
165 */
166 @Override
167 public CoordinatorJobBean getCoordJob(String jobId, int start, int length) throws BaseEngineException {
168 try {
169 if (useXCommand) {
170 return new CoordJobXCommand(jobId, start, length).call();
171 }
172 else {
173 return new CoordJobCommand(jobId, start, length).call();
174 }
175 }
176 catch (CommandException ex) {
177 throw new BaseEngineException(ex);
178 }
179 }
180
181 /*
182 * (non-Javadoc)
183 *
184 * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
185 */
186 @Override
187 public String getJobIdForExternalId(String externalId) throws CoordinatorEngineException {
188 return null;
189 }
190
191 /*
192 * (non-Javadoc)
193 *
194 * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
195 */
196 @Override
197 public void kill(String jobId) throws CoordinatorEngineException {
198 try {
199 if (useXCommand) {
200 new CoordKillXCommand(jobId).call();
201 }
202 else {
203 new CoordKillCommand(jobId).call();
204 }
205 LOG.info("User " + user + " killed the Coordinator job " + jobId);
206 }
207 catch (CommandException e) {
208 throw new CoordinatorEngineException(e);
209 }
210 }
211
212 /* (non-Javadoc)
213 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
214 */
215 @Override
216 public void change(String jobId, String changeValue) throws CoordinatorEngineException {
217 try {
218 if (useXCommand) {
219 new CoordChangeXCommand(jobId, changeValue).call();
220 }
221 else {
222 new CoordChangeCommand(jobId, changeValue).call();
223 }
224 LOG.info("User " + user + " changed the Coordinator job " + jobId + " to " + changeValue);
225 }
226 catch (CommandException e) {
227 throw new CoordinatorEngineException(e);
228 }
229 }
230
231 @Override
232 @Deprecated
233 public void reRun(String jobId, Configuration conf) throws BaseEngineException {
234 throw new BaseEngineException(new XException(ErrorCode.E0301));
235 }
236
237 /**
238 * Rerun coordinator actions for given rerunType
239 *
240 * @param jobId
241 * @param rerunType
242 * @param scope
243 * @param refresh
244 * @param noCleanup
245 * @throws BaseEngineException
246 */
247 public CoordinatorActionInfo reRun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup)
248 throws BaseEngineException {
249 try {
250 if (useXCommand) {
251 return new CoordRerunXCommand(jobId, rerunType, scope, refresh, noCleanup).call();
252 }
253 else {
254 return new CoordRerunCommand(jobId, rerunType, scope, refresh, noCleanup).call();
255 }
256 }
257 catch (CommandException ex) {
258 throw new BaseEngineException(ex);
259 }
260 }
261
262 /*
263 * (non-Javadoc)
264 *
265 * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
266 */
267 @Override
268 public void resume(String jobId) throws CoordinatorEngineException {
269 try {
270 if (useXCommand) {
271 new CoordResumeXCommand(jobId).call();
272 }
273 else {
274 new CoordResumeCommand(jobId).call();
275 }
276 }
277 catch (CommandException e) {
278 throw new CoordinatorEngineException(e);
279 }
280 }
281
282 @Override
283 @Deprecated
284 public void start(String jobId) throws BaseEngineException {
285 throw new BaseEngineException(new XException(ErrorCode.E0301));
286 }
287
288 /*
289 * (non-Javadoc)
290 *
291 * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String,
292 * java.io.Writer)
293 */
294 @Override
295 public void streamLog(String jobId, Writer writer) throws IOException, BaseEngineException {
296 XLogStreamer.Filter filter = new XLogStreamer.Filter();
297 filter.setParameter(DagXLogInfoService.JOB, jobId);
298
299 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
300 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
301 }
302
303 /**
304 * Add list of actions to the filter based on conditions
305 *
306 * @param jobId Job Id
307 * @param logRetrievalScope Value for the retrieval type
308 * @param logRetrievalType Based on which filter criteria the log is retrieved
309 * @param writer writer to stream the log to
310 * @throws IOException
311 * @throws BaseEngineException
312 * @throws CommandException
313 */
314 public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer)
315 throws IOException, BaseEngineException, CommandException {
316 XLogStreamer.Filter filter = new XLogStreamer.Filter();
317 filter.setParameter(DagXLogInfoService.JOB, jobId);
318 if (logRetrievalScope != null && logRetrievalType != null) {
319 // if coordinator action logs are to be retrieved based on action id range
320 if (logRetrievalType.equals(RestConstants.JOB_LOG_ACTION)) {
321 Set<String> actions = new HashSet<String>();
322 String[] list = logRetrievalScope.split(",");
323 for (String s : list) {
324 s = s.trim();
325 if (s.contains("-")) {
326 String[] range = s.split("-");
327 if (range.length != 2) {
328 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s
329 + "'");
330 }
331 int start;
332 int end;
333 try {
334 start = Integer.parseInt(range[0].trim());
335 end = Integer.parseInt(range[1].trim());
336 if (start > end) {
337 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s
338 + "'");
339 }
340 }
341 catch (NumberFormatException ne) {
342 throw new CommandException(ErrorCode.E0302, ne);
343 }
344 for (int i = start; i <= end; i++) {
345 actions.add(jobId + "@" + i);
346 }
347 }
348 else {
349 try {
350 Integer.parseInt(s);
351 }
352 catch (NumberFormatException ne) {
353 throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s
354 + "'. Integer only.");
355 }
356 actions.add(jobId + "@" + s);
357 }
358 }
359
360 Iterator<String> actionsIterator = actions.iterator();
361 StringBuilder orSeparatedActions = new StringBuilder("");
362 boolean orRequired = false;
363 while (actionsIterator.hasNext()) {
364 if (orRequired) {
365 orSeparatedActions.append("|");
366 }
367 orSeparatedActions.append(actionsIterator.next().toString());
368 orRequired = true;
369 }
370 if (actions.size() > 1 && orRequired) {
371 orSeparatedActions.insert(0, "(");
372 orSeparatedActions.append(")");
373 }
374 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
375 }
376 // if coordinator action logs are to be retrieved based on date range
377 // this block gets the corresponding list of coordinator actions to be used by the log filter
378 if (logRetrievalType.equalsIgnoreCase(RestConstants.JOB_LOG_DATE)) {
379 List<CoordinatorActionBean> actionsList = null;
380 try {
381 actionsList = CoordActionsInDateRange.getCoordActionsFromDates(jobId, logRetrievalScope);
382 }
383 catch (XException xe) {
384 throw new CommandException(ErrorCode.E0302, "Error in date range for coordinator actions", xe);
385 }
386 StringBuilder orSeparatedActions = new StringBuilder("");
387 boolean orRequired = false;
388 for (CoordinatorActionBean coordAction : actionsList) {
389 if (orRequired) {
390 orSeparatedActions.append("|");
391 }
392 orSeparatedActions.append(coordAction.getId());
393 orRequired = true;
394 }
395 if (actionsList.size() > 1 && orRequired) {
396 orSeparatedActions.insert(0, "(");
397 orSeparatedActions.append(")");
398 }
399 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
400 }
401 }
402 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
403 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
404 }
405
406 /*
407 * (non-Javadoc)
408 *
409 * @see
410 * org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration
411 * , boolean)
412 */
413 @Override
414 public String submitJob(Configuration conf, boolean startJob) throws CoordinatorEngineException {
415 try {
416 String jobId;
417 if (useXCommand) {
418 CoordSubmitXCommand submit = new CoordSubmitXCommand(conf, getAuthToken());
419 jobId = submit.call();
420 }
421 else {
422 CoordSubmitCommand submit = new CoordSubmitCommand(conf, getAuthToken());
423 jobId = submit.call();
424 }
425 return jobId;
426 }
427 catch (CommandException ex) {
428 throw new CoordinatorEngineException(ex);
429 }
430 }
431
432 /*
433 * (non-Javadoc)
434 *
435 * @see
436 * org.apache.oozie.BaseEngine#dryrunSubmit(org.apache.hadoop.conf.Configuration
437 * , boolean)
438 */
439 @Override
440 public String dryrunSubmit(Configuration conf, boolean startJob) throws CoordinatorEngineException {
441 try {
442 String jobId;
443 if (useXCommand) {
444 CoordSubmitXCommand submit = new CoordSubmitXCommand(true, conf, getAuthToken());
445 jobId = submit.call();
446 }
447 else {
448 CoordSubmitCommand submit = new CoordSubmitCommand(true, conf, getAuthToken());
449 jobId = submit.call();
450 }
451 return jobId;
452 }
453 catch (CommandException ex) {
454 throw new CoordinatorEngineException(ex);
455 }
456 }
457
458 /*
459 * (non-Javadoc)
460 *
461 * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
462 */
463 @Override
464 public void suspend(String jobId) throws CoordinatorEngineException {
465 try {
466 if (useXCommand) {
467 new CoordSuspendXCommand(jobId).call();
468 }
469 else {
470 new CoordSuspendCommand(jobId).call();
471 }
472 }
473 catch (CommandException e) {
474 throw new CoordinatorEngineException(e);
475 }
476
477 }
478
479 /*
480 * (non-Javadoc)
481 *
482 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
483 */
484 @Override
485 public WorkflowJob getJob(String jobId) throws BaseEngineException {
486 throw new BaseEngineException(new XException(ErrorCode.E0301));
487 }
488
489 /*
490 * (non-Javadoc)
491 *
492 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
493 */
494 @Override
495 public WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException {
496 throw new BaseEngineException(new XException(ErrorCode.E0301));
497 }
498
499 private static final Set<String> FILTER_NAMES = new HashSet<String>();
500
501 static {
502 FILTER_NAMES.add(OozieClient.FILTER_USER);
503 FILTER_NAMES.add(OozieClient.FILTER_NAME);
504 FILTER_NAMES.add(OozieClient.FILTER_GROUP);
505 FILTER_NAMES.add(OozieClient.FILTER_STATUS);
506 FILTER_NAMES.add(OozieClient.FILTER_ID);
507 FILTER_NAMES.add(OozieClient.FILTER_FREQUENCY);
508 }
509
510 /**
511 * @param filterStr
512 * @param start
513 * @param len
514 * @return CoordinatorJobInfo
515 * @throws CoordinatorEngineException
516 */
517 public CoordinatorJobInfo getCoordJobs(String filterStr, int start, int len) throws CoordinatorEngineException {
518 Map<String, List<String>> filter = parseFilter(filterStr);
519
520 try {
521 if (useXCommand) {
522 return new CoordJobsXCommand(filter, start, len).call();
523 }
524 else {
525 return new CoordJobsCommand(filter, start, len).call();
526 }
527 }
528 catch (CommandException ex) {
529 throw new CoordinatorEngineException(ex);
530 }
531 }
532
533 /**
534 * @param filter
535 * @return Map<String, List<String>>
536 * @throws CoordinatorEngineException
537 */
538 private Map<String, List<String>> parseFilter(String filter) throws CoordinatorEngineException {
539 Map<String, List<String>> map = new HashMap<String, List<String>>();
540 if (filter != null) {
541 StringTokenizer st = new StringTokenizer(filter, ";");
542 while (st.hasMoreTokens()) {
543 String token = st.nextToken();
544 if (token.contains("=")) {
545 String[] pair = token.split("=");
546 if (pair.length != 2) {
547 throw new CoordinatorEngineException(ErrorCode.E0420, filter,
548 "elements must be name=value pairs");
549 }
550 if (!FILTER_NAMES.contains(pair[0])) {
551 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]",
552 pair[0]));
553 }
554 if (pair[0].equals("status")) {
555 try {
556 CoordinatorJob.Status.valueOf(pair[1]);
557 }
558 catch (IllegalArgumentException ex) {
559 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
560 "invalid status [{0}]", pair[1]));
561 }
562 }
563 List<String> list = map.get(pair[0]);
564 if (list == null) {
565 list = new ArrayList<String>();
566 map.put(pair[0], list);
567 }
568 list.add(pair[1]);
569 }
570 else {
571 throw new CoordinatorEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
572 }
573 }
574 }
575 return map;
576 }
577 }