This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie;
019
020 import java.io.IOException;
021 import java.io.Writer;
022 import java.util.ArrayList;
023 import java.util.Date;
024 import java.util.HashMap;
025 import java.util.HashSet;
026 import java.util.Iterator;
027 import java.util.List;
028 import java.util.Map;
029 import java.util.Set;
030 import java.util.StringTokenizer;
031 import org.apache.hadoop.conf.Configuration;
032 import org.apache.oozie.client.CoordinatorAction;
033 import org.apache.oozie.client.CoordinatorJob;
034 import org.apache.oozie.client.OozieClient;
035 import org.apache.oozie.client.WorkflowJob;
036 import org.apache.oozie.client.rest.RestConstants;
037 import org.apache.oozie.command.CommandException;
038 import org.apache.oozie.command.coord.CoordActionInfoXCommand;
039 import org.apache.oozie.util.CoordActionsInDateRange;
040 import org.apache.oozie.command.coord.CoordChangeXCommand;
041 import org.apache.oozie.command.coord.CoordJobXCommand;
042 import org.apache.oozie.command.coord.CoordJobsXCommand;
043 import org.apache.oozie.command.coord.CoordKillXCommand;
044 import org.apache.oozie.command.coord.CoordRerunXCommand;
045 import org.apache.oozie.command.coord.CoordResumeXCommand;
046 import org.apache.oozie.command.coord.CoordSubmitXCommand;
047 import org.apache.oozie.command.coord.CoordSuspendXCommand;
048 import org.apache.oozie.service.DagXLogInfoService;
049 import org.apache.oozie.service.Services;
050 import org.apache.oozie.service.XLogService;
051 import org.apache.oozie.util.ParamChecker;
052 import org.apache.oozie.util.XLog;
053 import org.apache.oozie.util.XLogStreamer;
054
055 public class CoordinatorEngine extends BaseEngine {
056 private static XLog LOG = XLog.getLog(CoordinatorEngine.class);
057
058 /**
059 * Create a system Coordinator engine, with no user and no group.
060 */
061 public CoordinatorEngine() {
062 if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
063 LOG.debug("Oozie CoordinatorEngine is not using XCommands.");
064 }
065 else {
066 LOG.debug("Oozie CoordinatorEngine is using XCommands.");
067 }
068 }
069
070 /**
071 * Create a Coordinator engine to perform operations on behave of a user.
072 *
073 * @param user user name.
074 * @param authToken the authentication token.
075 */
076 public CoordinatorEngine(String user, String authToken) {
077 this();
078 this.user = ParamChecker.notEmpty(user, "user");
079 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
080 }
081
082 /*
083 * (non-Javadoc)
084 *
085 * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
086 */
087 @Override
088 public String getDefinition(String jobId) throws BaseEngineException {
089 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
090 return job.getOrigJobXml();
091 }
092
093 /**
094 * @param jobId
095 * @return CoordinatorJobBean
096 * @throws BaseEngineException
097 */
098 private CoordinatorJobBean getCoordJobWithNoActionInfo(String jobId) throws BaseEngineException {
099 try {
100 return new CoordJobXCommand(jobId).call();
101 }
102 catch (CommandException ex) {
103 throw new BaseEngineException(ex);
104 }
105 }
106
107 /**
108 * @param actionId
109 * @return CoordinatorActionBean
110 * @throws BaseEngineException
111 */
112 public CoordinatorActionBean getCoordAction(String actionId) throws BaseEngineException {
113 try {
114 return new CoordActionInfoXCommand(actionId).call();
115 }
116 catch (CommandException ex) {
117 throw new BaseEngineException(ex);
118 }
119 }
120
121 /*
122 * (non-Javadoc)
123 *
124 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
125 */
126 @Override
127 public CoordinatorJobBean getCoordJob(String jobId) throws BaseEngineException {
128 try {
129 return new CoordJobXCommand(jobId).call();
130 }
131 catch (CommandException ex) {
132 throw new BaseEngineException(ex);
133 }
134 }
135
136 /*
137 * (non-Javadoc)
138 *
139 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, java.lang.String, int, int)
140 */
141 @Override
142 public CoordinatorJobBean getCoordJob(String jobId, String filter, int start, int length) throws BaseEngineException {
143 List<String> filterList = parseStatusFilter(filter);
144 try {
145 return new CoordJobXCommand(jobId, filterList, start, length)
146 .call();
147 }
148 catch (CommandException ex) {
149 throw new BaseEngineException(ex);
150 }
151 }
152
153 /*
154 * (non-Javadoc)
155 *
156 * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
157 */
158 @Override
159 public String getJobIdForExternalId(String externalId) throws CoordinatorEngineException {
160 return null;
161 }
162
163 /*
164 * (non-Javadoc)
165 *
166 * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
167 */
168 @Override
169 public void kill(String jobId) throws CoordinatorEngineException {
170 try {
171 new CoordKillXCommand(jobId).call();
172 LOG.info("User " + user + " killed the Coordinator job " + jobId);
173 }
174 catch (CommandException e) {
175 throw new CoordinatorEngineException(e);
176 }
177 }
178
179 /* (non-Javadoc)
180 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
181 */
182 @Override
183 public void change(String jobId, String changeValue) throws CoordinatorEngineException {
184 try {
185 new CoordChangeXCommand(jobId, changeValue).call();
186 LOG.info("User " + user + " changed the Coordinator job " + jobId + " to " + changeValue);
187 }
188 catch (CommandException e) {
189 throw new CoordinatorEngineException(e);
190 }
191 }
192
193 @Override
194 @Deprecated
195 public void reRun(String jobId, Configuration conf) throws BaseEngineException {
196 throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of rerun"));
197 }
198
199 /**
200 * Rerun coordinator actions for given rerunType
201 *
202 * @param jobId
203 * @param rerunType
204 * @param scope
205 * @param refresh
206 * @param noCleanup
207 * @throws BaseEngineException
208 */
209 public CoordinatorActionInfo reRun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup)
210 throws BaseEngineException {
211 try {
212 return new CoordRerunXCommand(jobId, rerunType, scope, refresh,
213 noCleanup).call();
214 }
215 catch (CommandException ex) {
216 throw new BaseEngineException(ex);
217 }
218 }
219
220 /*
221 * (non-Javadoc)
222 *
223 * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
224 */
225 @Override
226 public void resume(String jobId) throws CoordinatorEngineException {
227 try {
228 new CoordResumeXCommand(jobId).call();
229 }
230 catch (CommandException e) {
231 throw new CoordinatorEngineException(e);
232 }
233 }
234
235 @Override
236 @Deprecated
237 public void start(String jobId) throws BaseEngineException {
238 throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of start"));
239 }
240
241 /*
242 * (non-Javadoc)
243 *
244 * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String,
245 * java.io.Writer)
246 */
247 @Override
248 public void streamLog(String jobId, Writer writer) throws IOException, BaseEngineException {
249 XLogStreamer.Filter filter = new XLogStreamer.Filter();
250 filter.setParameter(DagXLogInfoService.JOB, jobId);
251
252 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
253 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
254 }
255
256 /**
257 * Add list of actions to the filter based on conditions
258 *
259 * @param jobId Job Id
260 * @param logRetrievalScope Value for the retrieval type
261 * @param logRetrievalType Based on which filter criteria the log is retrieved
262 * @param writer writer to stream the log to
263 * @throws IOException
264 * @throws BaseEngineException
265 * @throws CommandException
266 */
267 public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer)
268 throws IOException, BaseEngineException, CommandException {
269 XLogStreamer.Filter filter = new XLogStreamer.Filter();
270 filter.setParameter(DagXLogInfoService.JOB, jobId);
271 if (logRetrievalScope != null && logRetrievalType != null) {
272 // if coordinator action logs are to be retrieved based on action id range
273 if (logRetrievalType.equals(RestConstants.JOB_LOG_ACTION)) {
274 Set<String> actions = new HashSet<String>();
275 String[] list = logRetrievalScope.split(",");
276 for (String s : list) {
277 s = s.trim();
278 if (s.contains("-")) {
279 String[] range = s.split("-");
280 if (range.length != 2) {
281 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s
282 + "'");
283 }
284 int start;
285 int end;
286 try {
287 start = Integer.parseInt(range[0].trim());
288 } catch (NumberFormatException ne) {
289 throw new CommandException(ErrorCode.E0302, "could not parse " + range[0].trim() + "into an integer",
290 ne);
291 }
292 try {
293 end = Integer.parseInt(range[1].trim());
294 } catch (NumberFormatException ne) {
295 throw new CommandException(ErrorCode.E0302, "could not parse " + range[1].trim() + "into an integer",
296 ne);
297 }
298 if (start > end) {
299 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'");
300 }
301 for (int i = start; i <= end; i++) {
302 actions.add(jobId + "@" + i);
303 }
304 }
305 else {
306 try {
307 Integer.parseInt(s);
308 }
309 catch (NumberFormatException ne) {
310 throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s
311 + "'. Integer only.");
312 }
313 actions.add(jobId + "@" + s);
314 }
315 }
316
317 Iterator<String> actionsIterator = actions.iterator();
318 StringBuilder orSeparatedActions = new StringBuilder("");
319 boolean orRequired = false;
320 while (actionsIterator.hasNext()) {
321 if (orRequired) {
322 orSeparatedActions.append("|");
323 }
324 orSeparatedActions.append(actionsIterator.next().toString());
325 orRequired = true;
326 }
327 if (actions.size() > 1 && orRequired) {
328 orSeparatedActions.insert(0, "(");
329 orSeparatedActions.append(")");
330 }
331 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
332 }
333 // if coordinator action logs are to be retrieved based on date range
334 // this block gets the corresponding list of coordinator actions to be used by the log filter
335 if (logRetrievalType.equalsIgnoreCase(RestConstants.JOB_LOG_DATE)) {
336 List<String> coordActionIdList = null;
337 try {
338 coordActionIdList = CoordActionsInDateRange.getCoordActionIdsFromDates(jobId, logRetrievalScope);
339 }
340 catch (XException xe) {
341 throw new CommandException(ErrorCode.E0302, "Error in date range for coordinator actions", xe);
342 }
343 StringBuilder orSeparatedActions = new StringBuilder("");
344 boolean orRequired = false;
345 for (String coordActionId : coordActionIdList) {
346 if (orRequired) {
347 orSeparatedActions.append("|");
348 }
349 orSeparatedActions.append(coordActionId);
350 orRequired = true;
351 }
352 if (coordActionIdList.size() > 1 && orRequired) {
353 orSeparatedActions.insert(0, "(");
354 orSeparatedActions.append(")");
355 }
356 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
357 }
358 }
359 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
360 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
361 }
362
363 /*
364 * (non-Javadoc)
365 *
366 * @see
367 * org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration
368 * , boolean)
369 */
370 @Override
371 public String submitJob(Configuration conf, boolean startJob) throws CoordinatorEngineException {
372 try {
373 CoordSubmitXCommand submit = new CoordSubmitXCommand(conf,
374 getAuthToken());
375 return submit.call();
376 }
377 catch (CommandException ex) {
378 throw new CoordinatorEngineException(ex);
379 }
380 }
381
382 /*
383 * (non-Javadoc)
384 *
385 * @see
386 * org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration)
387 */
388 @Override
389 public String dryRunSubmit(Configuration conf) throws CoordinatorEngineException {
390 try {
391 CoordSubmitXCommand submit = new CoordSubmitXCommand(true, conf,
392 getAuthToken());
393 return submit.call();
394 }
395 catch (CommandException ex) {
396 throw new CoordinatorEngineException(ex);
397 }
398 }
399
400 /*
401 * (non-Javadoc)
402 *
403 * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
404 */
405 @Override
406 public void suspend(String jobId) throws CoordinatorEngineException {
407 try {
408 new CoordSuspendXCommand(jobId).call();
409 }
410 catch (CommandException e) {
411 throw new CoordinatorEngineException(e);
412 }
413
414 }
415
416 /*
417 * (non-Javadoc)
418 *
419 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
420 */
421 @Override
422 public WorkflowJob getJob(String jobId) throws BaseEngineException {
423 throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine"));
424 }
425
426 /*
427 * (non-Javadoc)
428 *
429 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
430 */
431 @Override
432 public WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException {
433 throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine"));
434 }
435
436 private static final Set<String> FILTER_NAMES = new HashSet<String>();
437
438 static {
439 FILTER_NAMES.add(OozieClient.FILTER_USER);
440 FILTER_NAMES.add(OozieClient.FILTER_NAME);
441 FILTER_NAMES.add(OozieClient.FILTER_GROUP);
442 FILTER_NAMES.add(OozieClient.FILTER_STATUS);
443 FILTER_NAMES.add(OozieClient.FILTER_ID);
444 FILTER_NAMES.add(OozieClient.FILTER_FREQUENCY);
445 FILTER_NAMES.add(OozieClient.FILTER_UNIT);
446 }
447
448 /**
449 * @param filter
450 * @param start
451 * @param len
452 * @return CoordinatorJobInfo
453 * @throws CoordinatorEngineException
454 */
455 public CoordinatorJobInfo getCoordJobs(String filter, int start, int len) throws CoordinatorEngineException {
456 Map<String, List<String>> filterList = parseFilter(filter);
457
458 try {
459 return new CoordJobsXCommand(filterList, start, len).call();
460 }
461 catch (CommandException ex) {
462 throw new CoordinatorEngineException(ex);
463 }
464 }
465
466
467 // Parses the filter string (e.g status=RUNNING;status=WAITING) and returns a list of status values
468 private List<String> parseStatusFilter(String filter) throws CoordinatorEngineException {
469 List<String> filterList = new ArrayList<String>();
470 if (filter != null) {
471 //split name;value pairs
472 StringTokenizer st = new StringTokenizer(filter, ";");
473 while (st.hasMoreTokens()) {
474 String token = st.nextToken();
475 if (token.contains("=")) {
476 String[] pair = token.split("=");
477 if (pair.length != 2) {
478 throw new CoordinatorEngineException(ErrorCode.E0421, token,
479 "elements must be name=value pairs");
480 }
481 if (pair[0].equalsIgnoreCase("status")) {
482 String statusValue = pair[1];
483 try {
484 CoordinatorAction.Status.valueOf(statusValue);
485 } catch (IllegalArgumentException ex) {
486 StringBuilder validStatusList = new StringBuilder();
487 for (CoordinatorAction.Status status: CoordinatorAction.Status.values()){
488 validStatusList.append(status.toString()+" ");
489 }
490 // Check for incorrect status value
491 throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format(
492 "invalid status value [{0}]." + " Valid status values are: [{1}]", statusValue, validStatusList));
493 }
494 filterList.add(statusValue);
495 } else {
496 // Check for incorrect filter option
497 throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format(
498 "invalid filter [{0}]." + " The only valid filter is \"status\"", pair[0]));
499 }
500 } else {
501 throw new CoordinatorEngineException(ErrorCode.E0421, token,
502 "elements must be name=value pairs");
503 }
504 }
505 }
506 return filterList;
507 }
508
509 /**
510 * @param filter
511 * @return Map<String, List<String>>
512 * @throws CoordinatorEngineException
513 */
514 private Map<String, List<String>> parseFilter(String filter) throws CoordinatorEngineException {
515 Map<String, List<String>> map = new HashMap<String, List<String>>();
516 boolean isTimeUnitSpecified = false;
517 String timeUnit = "MINUTE";
518 boolean isFrequencySpecified = false;
519 String frequency = "";
520 if (filter != null) {
521 StringTokenizer st = new StringTokenizer(filter, ";");
522 while (st.hasMoreTokens()) {
523 String token = st.nextToken();
524 if (token.contains("=")) {
525 String[] pair = token.split("=");
526 if (pair.length != 2) {
527 throw new CoordinatorEngineException(ErrorCode.E0420, filter,
528 "elements must be name=value pairs");
529 }
530 if (!FILTER_NAMES.contains(pair[0].toLowerCase())) {
531 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]",
532 pair[0]));
533 }
534 if (pair[0].equalsIgnoreCase("frequency")) {
535 isFrequencySpecified = true;
536 try {
537 frequency = (int) Float.parseFloat(pair[1]) + "";
538 continue;
539 }
540 catch (NumberFormatException NANException) {
541 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
542 "invalid value [{0}] for frequency. A numerical value is expected", pair[1]));
543 }
544 }
545 if (pair[0].equalsIgnoreCase("unit")) {
546 isTimeUnitSpecified = true;
547 timeUnit = pair[1];
548 if (!timeUnit.equalsIgnoreCase("months") && !timeUnit.equalsIgnoreCase("days")
549 && !timeUnit.equalsIgnoreCase("hours") && !timeUnit.equalsIgnoreCase("minutes")) {
550 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
551 "invalid value [{0}] for time unit. "
552 + "Valid value is one of months, days, hours or minutes", pair[1]));
553 }
554 continue;
555 }
556 if (pair[0].equals("status")) {
557 try {
558 CoordinatorJob.Status.valueOf(pair[1]);
559 }
560 catch (IllegalArgumentException ex) {
561 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
562 "invalid status [{0}]", pair[1]));
563 }
564 }
565 List<String> list = map.get(pair[0]);
566 if (list == null) {
567 list = new ArrayList<String>();
568 map.put(pair[0], list);
569 }
570 list.add(pair[1]);
571 } else {
572 throw new CoordinatorEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
573 }
574 }
575 // Unit is specified and frequency is not specified
576 if (!isFrequencySpecified && isTimeUnitSpecified) {
577 throw new CoordinatorEngineException(ErrorCode.E0420, filter, "time unit should be added only when "
578 + "frequency is specified. Either specify frequency also or else remove the time unit");
579 } else if (isFrequencySpecified) {
580 // Frequency value is specified
581 if (isTimeUnitSpecified) {
582 if (timeUnit.equalsIgnoreCase("months")) {
583 timeUnit = "MONTH";
584 } else if (timeUnit.equalsIgnoreCase("days")) {
585 timeUnit = "DAY";
586 } else if (timeUnit.equalsIgnoreCase("hours")) {
587 // When job details are persisted to database, frequency in hours are converted to minutes.
588 // This conversion is to conform with that.
589 frequency = Integer.parseInt(frequency) * 60 + "";
590 timeUnit = "MINUTE";
591 } else if (timeUnit.equalsIgnoreCase("minutes")) {
592 timeUnit = "MINUTE";
593 }
594 }
595 // Adding the frequency and time unit filters to the filter map
596 List<String> list = new ArrayList<String>();
597 list.add(timeUnit);
598 map.put("unit", list);
599 list = new ArrayList<String>();
600 list.add(frequency);
601 map.put("frequency", list);
602 }
603 }
604 return map;
605 }
606 }