This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie;
019
020 import java.io.IOException;
021 import java.io.Writer;
022 import java.util.ArrayList;
023 import java.util.Date;
024 import java.util.HashMap;
025 import java.util.HashSet;
026 import java.util.Iterator;
027 import java.util.LinkedHashSet;
028 import java.util.List;
029 import java.util.Map;
030 import java.util.Set;
031 import java.util.StringTokenizer;
032
033 import org.apache.hadoop.conf.Configuration;
034 import org.apache.oozie.client.CoordinatorAction;
035 import org.apache.oozie.client.CoordinatorJob;
036 import org.apache.oozie.client.OozieClient;
037 import org.apache.oozie.client.WorkflowJob;
038 import org.apache.oozie.client.rest.RestConstants;
039 import org.apache.oozie.command.CommandException;
040 import org.apache.oozie.command.coord.CoordActionInfoXCommand;
041 import org.apache.oozie.util.CoordActionsInDateRange;
042 import org.apache.oozie.command.coord.CoordChangeXCommand;
043 import org.apache.oozie.command.coord.CoordJobXCommand;
044 import org.apache.oozie.command.coord.CoordJobsXCommand;
045 import org.apache.oozie.command.coord.CoordKillXCommand;
046 import org.apache.oozie.command.coord.CoordRerunXCommand;
047 import org.apache.oozie.command.coord.CoordResumeXCommand;
048 import org.apache.oozie.command.coord.CoordSubmitXCommand;
049 import org.apache.oozie.command.coord.CoordSuspendXCommand;
050 import org.apache.oozie.service.DagXLogInfoService;
051 import org.apache.oozie.service.Services;
052 import org.apache.oozie.service.XLogService;
053 import org.apache.oozie.util.ParamChecker;
054 import org.apache.oozie.util.XLog;
055 import org.apache.oozie.util.XLogStreamer;
056
057 import com.google.common.annotations.VisibleForTesting;
058
059 public class CoordinatorEngine extends BaseEngine {
060 private static XLog LOG = XLog.getLog(CoordinatorEngine.class);
061
062 /**
063 * Create a system Coordinator engine, with no user and no group.
064 */
065 public CoordinatorEngine() {
066 if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
067 LOG.debug("Oozie CoordinatorEngine is not using XCommands.");
068 }
069 else {
070 LOG.debug("Oozie CoordinatorEngine is using XCommands.");
071 }
072 }
073
074 /**
075 * Create a Coordinator engine to perform operations on behave of a user.
076 *
077 * @param user user name.
078 */
079 public CoordinatorEngine(String user) {
080 this();
081 this.user = ParamChecker.notEmpty(user, "user");
082 }
083
084 /*
085 * (non-Javadoc)
086 *
087 * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
088 */
089 @Override
090 public String getDefinition(String jobId) throws BaseEngineException {
091 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
092 return job.getOrigJobXml();
093 }
094
095 /**
096 * @param jobId
097 * @return CoordinatorJobBean
098 * @throws BaseEngineException
099 */
100 private CoordinatorJobBean getCoordJobWithNoActionInfo(String jobId) throws BaseEngineException {
101 try {
102 return new CoordJobXCommand(jobId).call();
103 }
104 catch (CommandException ex) {
105 throw new BaseEngineException(ex);
106 }
107 }
108
109 /**
110 * @param actionId
111 * @return CoordinatorActionBean
112 * @throws BaseEngineException
113 */
114 public CoordinatorActionBean getCoordAction(String actionId) throws BaseEngineException {
115 try {
116 return new CoordActionInfoXCommand(actionId).call();
117 }
118 catch (CommandException ex) {
119 throw new BaseEngineException(ex);
120 }
121 }
122
123 /*
124 * (non-Javadoc)
125 *
126 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
127 */
128 @Override
129 public CoordinatorJobBean getCoordJob(String jobId) throws BaseEngineException {
130 try {
131 return new CoordJobXCommand(jobId).call();
132 }
133 catch (CommandException ex) {
134 throw new BaseEngineException(ex);
135 }
136 }
137
138 /*
139 * (non-Javadoc)
140 *
141 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, java.lang.String, int, int)
142 */
143 @Override
144 public CoordinatorJobBean getCoordJob(String jobId, String filter, int start, int length, boolean desc)
145 throws BaseEngineException {
146 List<String> filterList = parseStatusFilter(filter);
147 try {
148 return new CoordJobXCommand(jobId, filterList, start, length, desc)
149 .call();
150 }
151 catch (CommandException ex) {
152 throw new BaseEngineException(ex);
153 }
154 }
155
156 /*
157 * (non-Javadoc)
158 *
159 * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
160 */
161 @Override
162 public String getJobIdForExternalId(String externalId) throws CoordinatorEngineException {
163 return null;
164 }
165
166 /*
167 * (non-Javadoc)
168 *
169 * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
170 */
171 @Override
172 public void kill(String jobId) throws CoordinatorEngineException {
173 try {
174 new CoordKillXCommand(jobId).call();
175 LOG.info("User " + user + " killed the Coordinator job " + jobId);
176 }
177 catch (CommandException e) {
178 throw new CoordinatorEngineException(e);
179 }
180 }
181
182 /* (non-Javadoc)
183 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
184 */
185 @Override
186 public void change(String jobId, String changeValue) throws CoordinatorEngineException {
187 try {
188 new CoordChangeXCommand(jobId, changeValue).call();
189 LOG.info("User " + user + " changed the Coordinator job " + jobId + " to " + changeValue);
190 }
191 catch (CommandException e) {
192 throw new CoordinatorEngineException(e);
193 }
194 }
195
196 @Override
197 @Deprecated
198 public void reRun(String jobId, Configuration conf) throws BaseEngineException {
199 throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of rerun"));
200 }
201
202 /**
203 * Rerun coordinator actions for given rerunType
204 *
205 * @param jobId
206 * @param rerunType
207 * @param scope
208 * @param refresh
209 * @param noCleanup
210 * @throws BaseEngineException
211 */
212 public CoordinatorActionInfo reRun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup)
213 throws BaseEngineException {
214 try {
215 return new CoordRerunXCommand(jobId, rerunType, scope, refresh,
216 noCleanup).call();
217 }
218 catch (CommandException ex) {
219 throw new BaseEngineException(ex);
220 }
221 }
222
223 /*
224 * (non-Javadoc)
225 *
226 * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
227 */
228 @Override
229 public void resume(String jobId) throws CoordinatorEngineException {
230 try {
231 new CoordResumeXCommand(jobId).call();
232 }
233 catch (CommandException e) {
234 throw new CoordinatorEngineException(e);
235 }
236 }
237
238 @Override
239 @Deprecated
240 public void start(String jobId) throws BaseEngineException {
241 throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of start"));
242 }
243
244 /*
245 * (non-Javadoc)
246 *
247 * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String,
248 * java.io.Writer)
249 */
250 @Override
251 public void streamLog(String jobId, Writer writer) throws IOException, BaseEngineException {
252 XLogStreamer.Filter filter = new XLogStreamer.Filter();
253 filter.setParameter(DagXLogInfoService.JOB, jobId);
254
255 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
256 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
257 }
258
259 /**
260 * Add list of actions to the filter based on conditions
261 *
262 * @param jobId Job Id
263 * @param logRetrievalScope Value for the retrieval type
264 * @param logRetrievalType Based on which filter criteria the log is retrieved
265 * @param writer writer to stream the log to
266 * @throws IOException
267 * @throws BaseEngineException
268 * @throws CommandException
269 */
270 public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer)
271 throws IOException, BaseEngineException, CommandException {
272 XLogStreamer.Filter filter = new XLogStreamer.Filter();
273 filter.setParameter(DagXLogInfoService.JOB, jobId);
274 if (logRetrievalScope != null && logRetrievalType != null) {
275 // if coordinator action logs are to be retrieved based on action id range
276 if (logRetrievalType.equals(RestConstants.JOB_LOG_ACTION)) {
277 // Use set implementation that maintains order or elements to achieve reproducibility:
278 Set<String> actionSet = new LinkedHashSet<String>();
279 String[] list = logRetrievalScope.split(",");
280 for (String s : list) {
281 s = s.trim();
282 if (s.contains("-")) {
283 String[] range = s.split("-");
284 if (range.length != 2) {
285 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s
286 + "'");
287 }
288 int start;
289 int end;
290 try {
291 start = Integer.parseInt(range[0].trim());
292 } catch (NumberFormatException ne) {
293 throw new CommandException(ErrorCode.E0302, "could not parse " + range[0].trim() + "into an integer",
294 ne);
295 }
296 try {
297 end = Integer.parseInt(range[1].trim());
298 } catch (NumberFormatException ne) {
299 throw new CommandException(ErrorCode.E0302, "could not parse " + range[1].trim() + "into an integer",
300 ne);
301 }
302 if (start > end) {
303 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'");
304 }
305 for (int i = start; i <= end; i++) {
306 actionSet.add(jobId + "@" + i);
307 }
308 }
309 else {
310 try {
311 Integer.parseInt(s);
312 }
313 catch (NumberFormatException ne) {
314 throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s
315 + "'. Integer only.");
316 }
317 actionSet.add(jobId + "@" + s);
318 }
319 }
320
321 Iterator<String> actionsIterator = actionSet.iterator();
322 StringBuilder orSeparatedActions = new StringBuilder("");
323 boolean orRequired = false;
324 while (actionsIterator.hasNext()) {
325 if (orRequired) {
326 orSeparatedActions.append("|");
327 }
328 orSeparatedActions.append(actionsIterator.next().toString());
329 orRequired = true;
330 }
331 if (actionSet.size() > 1 && orRequired) {
332 orSeparatedActions.insert(0, "(");
333 orSeparatedActions.append(")");
334 }
335 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
336 }
337 // if coordinator action logs are to be retrieved based on date range
338 // this block gets the corresponding list of coordinator actions to be used by the log filter
339 if (logRetrievalType.equalsIgnoreCase(RestConstants.JOB_LOG_DATE)) {
340 List<String> coordActionIdList = null;
341 try {
342 coordActionIdList = CoordActionsInDateRange.getCoordActionIdsFromDates(jobId, logRetrievalScope);
343 }
344 catch (XException xe) {
345 throw new CommandException(ErrorCode.E0302, "Error in date range for coordinator actions", xe);
346 }
347 StringBuilder orSeparatedActions = new StringBuilder("");
348 boolean orRequired = false;
349 for (String coordActionId : coordActionIdList) {
350 if (orRequired) {
351 orSeparatedActions.append("|");
352 }
353 orSeparatedActions.append(coordActionId);
354 orRequired = true;
355 }
356 if (coordActionIdList.size() > 1 && orRequired) {
357 orSeparatedActions.insert(0, "(");
358 orSeparatedActions.append(")");
359 }
360 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
361 }
362 }
363 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
364 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
365 }
366
367 /*
368 * (non-Javadoc)
369 *
370 * @see
371 * org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration
372 * , boolean)
373 */
374 @Override
375 public String submitJob(Configuration conf, boolean startJob) throws CoordinatorEngineException {
376 try {
377 CoordSubmitXCommand submit = new CoordSubmitXCommand(conf);
378 return submit.call();
379 }
380 catch (CommandException ex) {
381 throw new CoordinatorEngineException(ex);
382 }
383 }
384
385 /*
386 * (non-Javadoc)
387 *
388 * @see
389 * org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration)
390 */
391 @Override
392 public String dryRunSubmit(Configuration conf) throws CoordinatorEngineException {
393 try {
394 CoordSubmitXCommand submit = new CoordSubmitXCommand(true, conf);
395 return submit.call();
396 }
397 catch (CommandException ex) {
398 throw new CoordinatorEngineException(ex);
399 }
400 }
401
402 /*
403 * (non-Javadoc)
404 *
405 * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
406 */
407 @Override
408 public void suspend(String jobId) throws CoordinatorEngineException {
409 try {
410 new CoordSuspendXCommand(jobId).call();
411 }
412 catch (CommandException e) {
413 throw new CoordinatorEngineException(e);
414 }
415
416 }
417
418 /*
419 * (non-Javadoc)
420 *
421 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
422 */
423 @Override
424 public WorkflowJob getJob(String jobId) throws BaseEngineException {
425 throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine"));
426 }
427
428 /*
429 * (non-Javadoc)
430 *
431 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
432 */
433 @Override
434 public WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException {
435 throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine"));
436 }
437
438 private static final Set<String> FILTER_NAMES = new HashSet<String>();
439
440 static {
441 FILTER_NAMES.add(OozieClient.FILTER_USER);
442 FILTER_NAMES.add(OozieClient.FILTER_NAME);
443 FILTER_NAMES.add(OozieClient.FILTER_GROUP);
444 FILTER_NAMES.add(OozieClient.FILTER_STATUS);
445 FILTER_NAMES.add(OozieClient.FILTER_ID);
446 FILTER_NAMES.add(OozieClient.FILTER_FREQUENCY);
447 FILTER_NAMES.add(OozieClient.FILTER_UNIT);
448 }
449
450 /**
451 * @param filter
452 * @param start
453 * @param len
454 * @return CoordinatorJobInfo
455 * @throws CoordinatorEngineException
456 */
457 public CoordinatorJobInfo getCoordJobs(String filter, int start, int len) throws CoordinatorEngineException {
458 Map<String, List<String>> filterList = parseFilter(filter);
459
460 try {
461 return new CoordJobsXCommand(filterList, start, len).call();
462 }
463 catch (CommandException ex) {
464 throw new CoordinatorEngineException(ex);
465 }
466 }
467
468
469 // Parses the filter string (e.g status=RUNNING;status=WAITING) and returns a list of status values
470 private List<String> parseStatusFilter(String filter) throws CoordinatorEngineException {
471 List<String> filterList = new ArrayList<String>();
472 if (filter != null) {
473 //split name;value pairs
474 StringTokenizer st = new StringTokenizer(filter, ";");
475 while (st.hasMoreTokens()) {
476 String token = st.nextToken();
477 if (token.contains("=")) {
478 String[] pair = token.split("=");
479 if (pair.length != 2) {
480 throw new CoordinatorEngineException(ErrorCode.E0421, token,
481 "elements must be name=value pairs");
482 }
483 if (pair[0].equalsIgnoreCase("status")) {
484 String statusValue = pair[1];
485 try {
486 CoordinatorAction.Status.valueOf(statusValue);
487 } catch (IllegalArgumentException ex) {
488 StringBuilder validStatusList = new StringBuilder();
489 for (CoordinatorAction.Status status: CoordinatorAction.Status.values()){
490 validStatusList.append(status.toString()+" ");
491 }
492 // Check for incorrect status value
493 throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format(
494 "invalid status value [{0}]." + " Valid status values are: [{1}]", statusValue, validStatusList));
495 }
496 filterList.add(statusValue);
497 } else {
498 // Check for incorrect filter option
499 throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format(
500 "invalid filter [{0}]." + " The only valid filter is \"status\"", pair[0]));
501 }
502 } else {
503 throw new CoordinatorEngineException(ErrorCode.E0421, token,
504 "elements must be name=value pairs");
505 }
506 }
507 }
508 return filterList;
509 }
510
511 /**
512 * @param filter
513 * @return Map<String, List<String>>
514 * @throws CoordinatorEngineException
515 */
516 @VisibleForTesting
517 Map<String, List<String>> parseFilter(String filter) throws CoordinatorEngineException {
518 Map<String, List<String>> map = new HashMap<String, List<String>>();
519 boolean isTimeUnitSpecified = false;
520 String timeUnit = "MINUTE";
521 boolean isFrequencySpecified = false;
522 String frequency = "";
523 if (filter != null) {
524 StringTokenizer st = new StringTokenizer(filter, ";");
525 while (st.hasMoreTokens()) {
526 String token = st.nextToken();
527 if (token.contains("=")) {
528 String[] pair = token.split("=");
529 if (pair.length != 2) {
530 throw new CoordinatorEngineException(ErrorCode.E0420, filter,
531 "elements must be name=value pairs");
532 }
533 if (!FILTER_NAMES.contains(pair[0].toLowerCase())) {
534 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]",
535 pair[0]));
536 }
537 if (pair[0].equalsIgnoreCase("frequency")) {
538 isFrequencySpecified = true;
539 try {
540 frequency = (int) Float.parseFloat(pair[1]) + "";
541 continue;
542 }
543 catch (NumberFormatException NANException) {
544 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
545 "invalid value [{0}] for frequency. A numerical value is expected", pair[1]));
546 }
547 }
548 if (pair[0].equalsIgnoreCase("unit")) {
549 isTimeUnitSpecified = true;
550 timeUnit = pair[1];
551 if (!timeUnit.equalsIgnoreCase("months") && !timeUnit.equalsIgnoreCase("days")
552 && !timeUnit.equalsIgnoreCase("hours") && !timeUnit.equalsIgnoreCase("minutes")) {
553 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
554 "invalid value [{0}] for time unit. "
555 + "Valid value is one of months, days, hours or minutes", pair[1]));
556 }
557 continue;
558 }
559 if (pair[0].equals("status")) {
560 try {
561 CoordinatorJob.Status.valueOf(pair[1]);
562 }
563 catch (IllegalArgumentException ex) {
564 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
565 "invalid status [{0}]", pair[1]));
566 }
567 }
568 List<String> list = map.get(pair[0]);
569 if (list == null) {
570 list = new ArrayList<String>();
571 map.put(pair[0], list);
572 }
573 list.add(pair[1]);
574 } else {
575 throw new CoordinatorEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
576 }
577 }
578 // Unit is specified and frequency is not specified
579 if (!isFrequencySpecified && isTimeUnitSpecified) {
580 throw new CoordinatorEngineException(ErrorCode.E0420, filter, "time unit should be added only when "
581 + "frequency is specified. Either specify frequency also or else remove the time unit");
582 } else if (isFrequencySpecified) {
583 // Frequency value is specified
584 if (isTimeUnitSpecified) {
585 if (timeUnit.equalsIgnoreCase("months")) {
586 timeUnit = "MONTH";
587 } else if (timeUnit.equalsIgnoreCase("days")) {
588 timeUnit = "DAY";
589 } else if (timeUnit.equalsIgnoreCase("hours")) {
590 // When job details are persisted to database, frequency in hours are converted to minutes.
591 // This conversion is to conform with that.
592 frequency = Integer.parseInt(frequency) * 60 + "";
593 timeUnit = "MINUTE";
594 } else if (timeUnit.equalsIgnoreCase("minutes")) {
595 timeUnit = "MINUTE";
596 }
597 }
598 // Adding the frequency and time unit filters to the filter map
599 List<String> list = new ArrayList<String>();
600 list.add(timeUnit);
601 map.put("unit", list);
602 list = new ArrayList<String>();
603 list.add(frequency);
604 map.put("frequency", list);
605 }
606 }
607 return map;
608 }
609 }