This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie;
019
020 import java.io.IOException;
021 import java.io.Writer;
022 import java.util.ArrayList;
023 import java.util.Date;
024 import java.util.HashMap;
025 import java.util.HashSet;
026 import java.util.Iterator;
027 import java.util.List;
028 import java.util.Map;
029 import java.util.Set;
030 import java.util.StringTokenizer;
031 import org.apache.hadoop.conf.Configuration;
032 import org.apache.oozie.client.CoordinatorAction;
033 import org.apache.oozie.client.CoordinatorJob;
034 import org.apache.oozie.client.OozieClient;
035 import org.apache.oozie.client.WorkflowJob;
036 import org.apache.oozie.client.rest.RestConstants;
037 import org.apache.oozie.command.CommandException;
038 import org.apache.oozie.command.coord.CoordActionInfoXCommand;
039 import org.apache.oozie.util.CoordActionsInDateRange;
040 import org.apache.oozie.command.coord.CoordChangeXCommand;
041 import org.apache.oozie.command.coord.CoordJobXCommand;
042 import org.apache.oozie.command.coord.CoordJobsXCommand;
043 import org.apache.oozie.command.coord.CoordKillXCommand;
044 import org.apache.oozie.command.coord.CoordRerunXCommand;
045 import org.apache.oozie.command.coord.CoordResumeXCommand;
046 import org.apache.oozie.command.coord.CoordSubmitXCommand;
047 import org.apache.oozie.command.coord.CoordSuspendXCommand;
048 import org.apache.oozie.service.DagXLogInfoService;
049 import org.apache.oozie.service.Services;
050 import org.apache.oozie.service.XLogService;
051 import org.apache.oozie.util.ParamChecker;
052 import org.apache.oozie.util.XLog;
053 import org.apache.oozie.util.XLogStreamer;
054
055 public class CoordinatorEngine extends BaseEngine {
056 private static XLog LOG = XLog.getLog(CoordinatorEngine.class);
057
058 /**
059 * Create a system Coordinator engine, with no user and no group.
060 */
061 public CoordinatorEngine() {
062 if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
063 LOG.debug("Oozie CoordinatorEngine is not using XCommands.");
064 }
065 else {
066 LOG.debug("Oozie CoordinatorEngine is using XCommands.");
067 }
068 }
069
070 /**
071 * Create a Coordinator engine to perform operations on behave of a user.
072 *
073 * @param user user name.
074 * @param authToken the authentication token.
075 */
076 public CoordinatorEngine(String user, String authToken) {
077 this();
078 this.user = ParamChecker.notEmpty(user, "user");
079 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
080 }
081
082 /*
083 * (non-Javadoc)
084 *
085 * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
086 */
087 @Override
088 public String getDefinition(String jobId) throws BaseEngineException {
089 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
090 return job.getOrigJobXml();
091 }
092
093 /**
094 * @param jobId
095 * @return CoordinatorJobBean
096 * @throws BaseEngineException
097 */
098 private CoordinatorJobBean getCoordJobWithNoActionInfo(String jobId) throws BaseEngineException {
099 try {
100 return new CoordJobXCommand(jobId).call();
101 }
102 catch (CommandException ex) {
103 throw new BaseEngineException(ex);
104 }
105 }
106
107 /**
108 * @param actionId
109 * @return CoordinatorActionBean
110 * @throws BaseEngineException
111 */
112 public CoordinatorActionBean getCoordAction(String actionId) throws BaseEngineException {
113 try {
114 return new CoordActionInfoXCommand(actionId).call();
115 }
116 catch (CommandException ex) {
117 throw new BaseEngineException(ex);
118 }
119 }
120
121 /*
122 * (non-Javadoc)
123 *
124 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
125 */
126 @Override
127 public CoordinatorJobBean getCoordJob(String jobId) throws BaseEngineException {
128 try {
129 return new CoordJobXCommand(jobId).call();
130 }
131 catch (CommandException ex) {
132 throw new BaseEngineException(ex);
133 }
134 }
135
136 /*
137 * (non-Javadoc)
138 *
139 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, java.lang.String, int, int)
140 */
141 @Override
142 public CoordinatorJobBean getCoordJob(String jobId, String filter, int start, int length) throws BaseEngineException {
143 List<String> filterList = parseStatusFilter(filter);
144 try {
145 return new CoordJobXCommand(jobId, filterList, start, length)
146 .call();
147 }
148 catch (CommandException ex) {
149 throw new BaseEngineException(ex);
150 }
151 }
152
153 /*
154 * (non-Javadoc)
155 *
156 * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
157 */
158 @Override
159 public String getJobIdForExternalId(String externalId) throws CoordinatorEngineException {
160 return null;
161 }
162
163 /*
164 * (non-Javadoc)
165 *
166 * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
167 */
168 @Override
169 public void kill(String jobId) throws CoordinatorEngineException {
170 try {
171 new CoordKillXCommand(jobId).call();
172 LOG.info("User " + user + " killed the Coordinator job " + jobId);
173 }
174 catch (CommandException e) {
175 throw new CoordinatorEngineException(e);
176 }
177 }
178
179 /* (non-Javadoc)
180 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
181 */
182 @Override
183 public void change(String jobId, String changeValue) throws CoordinatorEngineException {
184 try {
185 new CoordChangeXCommand(jobId, changeValue).call();
186 LOG.info("User " + user + " changed the Coordinator job " + jobId + " to " + changeValue);
187 }
188 catch (CommandException e) {
189 throw new CoordinatorEngineException(e);
190 }
191 }
192
193 @Override
194 @Deprecated
195 public void reRun(String jobId, Configuration conf) throws BaseEngineException {
196 throw new BaseEngineException(new XException(ErrorCode.E0301));
197 }
198
199 /**
200 * Rerun coordinator actions for given rerunType
201 *
202 * @param jobId
203 * @param rerunType
204 * @param scope
205 * @param refresh
206 * @param noCleanup
207 * @throws BaseEngineException
208 */
209 public CoordinatorActionInfo reRun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup)
210 throws BaseEngineException {
211 try {
212 return new CoordRerunXCommand(jobId, rerunType, scope, refresh,
213 noCleanup).call();
214 }
215 catch (CommandException ex) {
216 throw new BaseEngineException(ex);
217 }
218 }
219
220 /*
221 * (non-Javadoc)
222 *
223 * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
224 */
225 @Override
226 public void resume(String jobId) throws CoordinatorEngineException {
227 try {
228 new CoordResumeXCommand(jobId).call();
229 }
230 catch (CommandException e) {
231 throw new CoordinatorEngineException(e);
232 }
233 }
234
235 @Override
236 @Deprecated
237 public void start(String jobId) throws BaseEngineException {
238 throw new BaseEngineException(new XException(ErrorCode.E0301));
239 }
240
241 /*
242 * (non-Javadoc)
243 *
244 * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String,
245 * java.io.Writer)
246 */
247 @Override
248 public void streamLog(String jobId, Writer writer) throws IOException, BaseEngineException {
249 XLogStreamer.Filter filter = new XLogStreamer.Filter();
250 filter.setParameter(DagXLogInfoService.JOB, jobId);
251
252 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
253 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
254 }
255
256 /**
257 * Add list of actions to the filter based on conditions
258 *
259 * @param jobId Job Id
260 * @param logRetrievalScope Value for the retrieval type
261 * @param logRetrievalType Based on which filter criteria the log is retrieved
262 * @param writer writer to stream the log to
263 * @throws IOException
264 * @throws BaseEngineException
265 * @throws CommandException
266 */
267 public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer)
268 throws IOException, BaseEngineException, CommandException {
269 XLogStreamer.Filter filter = new XLogStreamer.Filter();
270 filter.setParameter(DagXLogInfoService.JOB, jobId);
271 if (logRetrievalScope != null && logRetrievalType != null) {
272 // if coordinator action logs are to be retrieved based on action id range
273 if (logRetrievalType.equals(RestConstants.JOB_LOG_ACTION)) {
274 Set<String> actions = new HashSet<String>();
275 String[] list = logRetrievalScope.split(",");
276 for (String s : list) {
277 s = s.trim();
278 if (s.contains("-")) {
279 String[] range = s.split("-");
280 if (range.length != 2) {
281 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s
282 + "'");
283 }
284 int start;
285 int end;
286 try {
287 start = Integer.parseInt(range[0].trim());
288 end = Integer.parseInt(range[1].trim());
289 if (start > end) {
290 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s
291 + "'");
292 }
293 }
294 catch (NumberFormatException ne) {
295 throw new CommandException(ErrorCode.E0302, ne);
296 }
297 for (int i = start; i <= end; i++) {
298 actions.add(jobId + "@" + i);
299 }
300 }
301 else {
302 try {
303 Integer.parseInt(s);
304 }
305 catch (NumberFormatException ne) {
306 throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s
307 + "'. Integer only.");
308 }
309 actions.add(jobId + "@" + s);
310 }
311 }
312
313 Iterator<String> actionsIterator = actions.iterator();
314 StringBuilder orSeparatedActions = new StringBuilder("");
315 boolean orRequired = false;
316 while (actionsIterator.hasNext()) {
317 if (orRequired) {
318 orSeparatedActions.append("|");
319 }
320 orSeparatedActions.append(actionsIterator.next().toString());
321 orRequired = true;
322 }
323 if (actions.size() > 1 && orRequired) {
324 orSeparatedActions.insert(0, "(");
325 orSeparatedActions.append(")");
326 }
327 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
328 }
329 // if coordinator action logs are to be retrieved based on date range
330 // this block gets the corresponding list of coordinator actions to be used by the log filter
331 if (logRetrievalType.equalsIgnoreCase(RestConstants.JOB_LOG_DATE)) {
332 List<String> coordActionIdList = null;
333 try {
334 coordActionIdList = CoordActionsInDateRange.getCoordActionIdsFromDates(jobId, logRetrievalScope);
335 }
336 catch (XException xe) {
337 throw new CommandException(ErrorCode.E0302, "Error in date range for coordinator actions", xe);
338 }
339 StringBuilder orSeparatedActions = new StringBuilder("");
340 boolean orRequired = false;
341 for (String coordActionId : coordActionIdList) {
342 if (orRequired) {
343 orSeparatedActions.append("|");
344 }
345 orSeparatedActions.append(coordActionId);
346 orRequired = true;
347 }
348 if (coordActionIdList.size() > 1 && orRequired) {
349 orSeparatedActions.insert(0, "(");
350 orSeparatedActions.append(")");
351 }
352 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
353 }
354 }
355 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
356 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
357 }
358
359 /*
360 * (non-Javadoc)
361 *
362 * @see
363 * org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration
364 * , boolean)
365 */
366 @Override
367 public String submitJob(Configuration conf, boolean startJob) throws CoordinatorEngineException {
368 try {
369 CoordSubmitXCommand submit = new CoordSubmitXCommand(conf,
370 getAuthToken());
371 return submit.call();
372 }
373 catch (CommandException ex) {
374 throw new CoordinatorEngineException(ex);
375 }
376 }
377
378 /*
379 * (non-Javadoc)
380 *
381 * @see
382 * org.apache.oozie.BaseEngine#dryrunSubmit(org.apache.hadoop.conf.Configuration
383 * , boolean)
384 */
385 @Override
386 public String dryrunSubmit(Configuration conf, boolean startJob) throws CoordinatorEngineException {
387 try {
388 CoordSubmitXCommand submit = new CoordSubmitXCommand(true, conf,
389 getAuthToken());
390 return submit.call();
391 }
392 catch (CommandException ex) {
393 throw new CoordinatorEngineException(ex);
394 }
395 }
396
397 /*
398 * (non-Javadoc)
399 *
400 * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
401 */
402 @Override
403 public void suspend(String jobId) throws CoordinatorEngineException {
404 try {
405 new CoordSuspendXCommand(jobId).call();
406 }
407 catch (CommandException e) {
408 throw new CoordinatorEngineException(e);
409 }
410
411 }
412
413 /*
414 * (non-Javadoc)
415 *
416 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
417 */
418 @Override
419 public WorkflowJob getJob(String jobId) throws BaseEngineException {
420 throw new BaseEngineException(new XException(ErrorCode.E0301));
421 }
422
423 /*
424 * (non-Javadoc)
425 *
426 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
427 */
428 @Override
429 public WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException {
430 throw new BaseEngineException(new XException(ErrorCode.E0301));
431 }
432
433 private static final Set<String> FILTER_NAMES = new HashSet<String>();
434
435 static {
436 FILTER_NAMES.add(OozieClient.FILTER_USER);
437 FILTER_NAMES.add(OozieClient.FILTER_NAME);
438 FILTER_NAMES.add(OozieClient.FILTER_GROUP);
439 FILTER_NAMES.add(OozieClient.FILTER_STATUS);
440 FILTER_NAMES.add(OozieClient.FILTER_ID);
441 FILTER_NAMES.add(OozieClient.FILTER_FREQUENCY);
442 FILTER_NAMES.add(OozieClient.FILTER_UNIT);
443 }
444
445 /**
446 * @param filter
447 * @param start
448 * @param len
449 * @return CoordinatorJobInfo
450 * @throws CoordinatorEngineException
451 */
452 public CoordinatorJobInfo getCoordJobs(String filter, int start, int len) throws CoordinatorEngineException {
453 Map<String, List<String>> filterList = parseFilter(filter);
454
455 try {
456 return new CoordJobsXCommand(filterList, start, len).call();
457 }
458 catch (CommandException ex) {
459 throw new CoordinatorEngineException(ex);
460 }
461 }
462
463
464 // Parses the filter string (e.g status=RUNNING;status=WAITING) and returns a list of status values
465 private List<String> parseStatusFilter(String filter) throws CoordinatorEngineException {
466 List<String> filterList = new ArrayList<String>();
467 if (filter != null) {
468 //split name;value pairs
469 StringTokenizer st = new StringTokenizer(filter, ";");
470 while (st.hasMoreTokens()) {
471 String token = st.nextToken();
472 if (token.contains("=")) {
473 String[] pair = token.split("=");
474 if (pair.length != 2) {
475 throw new CoordinatorEngineException(ErrorCode.E0421, token,
476 "elements must be name=value pairs");
477 }
478 if (pair[0].equalsIgnoreCase("status")) {
479 String statusValue = pair[1];
480 try {
481 CoordinatorAction.Status.valueOf(statusValue);
482 } catch (IllegalArgumentException ex) {
483 StringBuilder validStatusList = new StringBuilder();
484 for (CoordinatorAction.Status status: CoordinatorAction.Status.values()){
485 validStatusList.append(status.toString()+" ");
486 }
487 // Check for incorrect status value
488 throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format(
489 "invalid status value [{0}]." + " Valid status values are: [{1}]", statusValue, validStatusList));
490 }
491 filterList.add(statusValue);
492 } else {
493 // Check for incorrect filter option
494 throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format(
495 "invalid filter [{0}]." + " The only valid filter is \"status\"", pair[0]));
496 }
497 } else {
498 throw new CoordinatorEngineException(ErrorCode.E0421, token,
499 "elements must be name=value pairs");
500 }
501 }
502 }
503 return filterList;
504 }
505
506 /**
507 * @param filter
508 * @return Map<String, List<String>>
509 * @throws CoordinatorEngineException
510 */
511 private Map<String, List<String>> parseFilter(String filter) throws CoordinatorEngineException {
512 Map<String, List<String>> map = new HashMap<String, List<String>>();
513 boolean isTimeUnitSpecified = false;
514 String timeUnit = "MINUTE";
515 boolean isFrequencySpecified = false;
516 String frequency = "";
517 if (filter != null) {
518 StringTokenizer st = new StringTokenizer(filter, ";");
519 while (st.hasMoreTokens()) {
520 String token = st.nextToken();
521 if (token.contains("=")) {
522 String[] pair = token.split("=");
523 if (pair.length != 2) {
524 throw new CoordinatorEngineException(ErrorCode.E0420, filter,
525 "elements must be name=value pairs");
526 }
527 if (!FILTER_NAMES.contains(pair[0].toLowerCase())) {
528 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]",
529 pair[0]));
530 }
531 if (pair[0].equalsIgnoreCase("frequency")) {
532 isFrequencySpecified = true;
533 try {
534 frequency = (int) Float.parseFloat(pair[1]) + "";
535 continue;
536 }
537 catch (NumberFormatException NANException) {
538 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
539 "invalid value [{0}] for frequency. A numerical value is expected", pair[1]));
540 }
541 }
542 if (pair[0].equalsIgnoreCase("unit")) {
543 isTimeUnitSpecified = true;
544 timeUnit = pair[1];
545 if (!timeUnit.equalsIgnoreCase("months") && !timeUnit.equalsIgnoreCase("days")
546 && !timeUnit.equalsIgnoreCase("hours") && !timeUnit.equalsIgnoreCase("minutes")) {
547 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
548 "invalid value [{0}] for time unit. "
549 + "Valid value is one of months, days, hours or minutes", pair[1]));
550 }
551 continue;
552 }
553 if (pair[0].equals("status")) {
554 try {
555 CoordinatorJob.Status.valueOf(pair[1]);
556 }
557 catch (IllegalArgumentException ex) {
558 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
559 "invalid status [{0}]", pair[1]));
560 }
561 }
562 List<String> list = map.get(pair[0]);
563 if (list == null) {
564 list = new ArrayList<String>();
565 map.put(pair[0], list);
566 }
567 list.add(pair[1]);
568 } else {
569 throw new CoordinatorEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
570 }
571 }
572 // Unit is specified and frequency is not specified
573 if (!isFrequencySpecified && isTimeUnitSpecified) {
574 throw new CoordinatorEngineException(ErrorCode.E0420, filter, "time unit should be added only when "
575 + "frequency is specified. Either specify frequency also or else remove the time unit");
576 } else if (isFrequencySpecified) {
577 // Frequency value is specified
578 if (isTimeUnitSpecified) {
579 if (timeUnit.equalsIgnoreCase("months")) {
580 timeUnit = "MONTH";
581 } else if (timeUnit.equalsIgnoreCase("days")) {
582 timeUnit = "DAY";
583 } else if (timeUnit.equalsIgnoreCase("hours")) {
584 // When job details are persisted to database, frequency in hours are converted to minutes.
585 // This conversion is to conform with that.
586 frequency = Integer.parseInt(frequency) * 60 + "";
587 timeUnit = "MINUTE";
588 } else if (timeUnit.equalsIgnoreCase("minutes")) {
589 timeUnit = "MINUTE";
590 }
591 }
592 // Adding the frequency and time unit filters to the filter map
593 List<String> list = new ArrayList<String>();
594 list.add(timeUnit);
595 map.put("unit", list);
596 list = new ArrayList<String>();
597 list.add(frequency);
598 map.put("frequency", list);
599 }
600 }
601 return map;
602 }
603 }