001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie; 019 020import com.google.common.annotations.VisibleForTesting; 021import org.apache.commons.lang.StringUtils; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.oozie.client.CoordinatorAction; 024import org.apache.oozie.client.CoordinatorJob; 025import org.apache.oozie.client.OozieClient; 026import org.apache.oozie.client.WorkflowJob; 027import org.apache.oozie.client.rest.RestConstants; 028import org.apache.oozie.command.CommandException; 029import org.apache.oozie.command.coord.CoordActionInfoXCommand; 030import org.apache.oozie.command.coord.CoordActionsIgnoreXCommand; 031import org.apache.oozie.command.coord.CoordActionsKillXCommand; 032import org.apache.oozie.command.coord.CoordChangeXCommand; 033import org.apache.oozie.command.coord.CoordJobXCommand; 034import org.apache.oozie.command.coord.CoordJobsXCommand; 035import org.apache.oozie.command.coord.CoordKillXCommand; 036import org.apache.oozie.command.coord.CoordRerunXCommand; 037import org.apache.oozie.command.coord.CoordResumeXCommand; 038import org.apache.oozie.command.coord.CoordSubmitXCommand; 039import org.apache.oozie.command.coord.CoordSuspendXCommand; 040import org.apache.oozie.command.coord.CoordUpdateXCommand; 041import org.apache.oozie.executor.jpa.JPAExecutorException; 042import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 043import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 044import org.apache.oozie.service.DagXLogInfoService; 045import org.apache.oozie.service.Services; 046import org.apache.oozie.service.XLogStreamingService; 047import org.apache.oozie.util.CoordActionsInDateRange; 048import org.apache.oozie.util.DateUtils; 049import org.apache.oozie.util.Pair; 050import org.apache.oozie.util.ParamChecker; 051import org.apache.oozie.util.XLog; 052import org.apache.oozie.util.XLogFilter; 053import org.apache.oozie.util.XLogUserFilterParam; 054 055import java.io.IOException; 056import java.io.Writer; 057import java.sql.Timestamp; 058import java.text.ParseException; 059import java.util.ArrayList; 060import java.util.Collections; 061import java.util.Comparator; 062import java.util.Date; 063import java.util.HashMap; 064import java.util.HashSet; 065import java.util.Iterator; 066import java.util.LinkedHashSet; 067import java.util.List; 068import java.util.Map; 069import java.util.Set; 070import java.util.StringTokenizer; 071 072public class CoordinatorEngine extends BaseEngine { 073 private static final XLog LOG = XLog.getLog(CoordinatorEngine.class); 074 public final static String COORD_ACTIONS_LOG_MAX_COUNT = "oozie.coord.actions.log.max.count"; 075 private final static int COORD_ACTIONS_LOG_MAX_COUNT_DEFAULT = 50; 076 private final int maxNumActionsForLog; 077 078 public enum FILTER_COMPARATORS { 079 //This ordering is important, dont change this 080 GREATER_EQUAL(">="), GREATER(">"), LESSTHAN_EQUAL("<="), LESSTHAN("<"), NOT_EQUALS("!="), EQUALS("="); 081 082 private final String sign; 083 084 FILTER_COMPARATORS(String sign) { 085 this.sign = sign; 086 } 087 088 public String getSign() { 089 return sign; 090 } 091 } 092 093 public static final String[] VALID_JOB_FILTERS = {OozieClient.FILTER_STATUS, OozieClient.FILTER_NOMINAL_TIME}; 094 095 /** 096 * Create a system Coordinator engine, with no user and no group. 097 */ 098 public CoordinatorEngine() { 099 if (!Services.get().getConf().getBoolean(USE_XCOMMAND, true)) { 100 LOG.debug("Oozie CoordinatorEngine is not using XCommands."); 101 } 102 else { 103 LOG.debug("Oozie CoordinatorEngine is using XCommands."); 104 } 105 maxNumActionsForLog = Services.get().getConf() 106 .getInt(COORD_ACTIONS_LOG_MAX_COUNT, COORD_ACTIONS_LOG_MAX_COUNT_DEFAULT); 107 } 108 109 /** 110 * Create a Coordinator engine to perform operations on behave of a user. 111 * 112 * @param user user name. 113 */ 114 public CoordinatorEngine(String user) { 115 this(); 116 this.user = ParamChecker.notEmpty(user, "user"); 117 } 118 119 /* 120 * (non-Javadoc) 121 * 122 * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String) 123 */ 124 @Override 125 public String getDefinition(String jobId) throws BaseEngineException { 126 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); 127 return job.getOrigJobXml(); 128 } 129 130 /** 131 * @param jobId 132 * @return CoordinatorJobBean 133 * @throws BaseEngineException 134 */ 135 private CoordinatorJobBean getCoordJobWithNoActionInfo(String jobId) throws BaseEngineException { 136 try { 137 return new CoordJobXCommand(jobId).call(); 138 } 139 catch (CommandException ex) { 140 throw new BaseEngineException(ex); 141 } 142 } 143 144 /** 145 * @param actionId 146 * @return CoordinatorActionBean 147 * @throws BaseEngineException 148 */ 149 public CoordinatorActionBean getCoordAction(String actionId) throws BaseEngineException { 150 try { 151 return new CoordActionInfoXCommand(actionId).call(); 152 } 153 catch (CommandException ex) { 154 throw new BaseEngineException(ex); 155 } 156 } 157 158 /* 159 * (non-Javadoc) 160 * 161 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String) 162 */ 163 @Override 164 public CoordinatorJobBean getCoordJob(String jobId) throws BaseEngineException { 165 try { 166 return new CoordJobXCommand(jobId).call(); 167 } 168 catch (CommandException ex) { 169 throw new BaseEngineException(ex); 170 } 171 } 172 173 /* 174 * (non-Javadoc) 175 * 176 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, java.lang.String, int, int) 177 */ 178 @Override 179 public CoordinatorJobBean getCoordJob(String jobId, String filter, int offset, int length, boolean desc) 180 throws BaseEngineException { 181 Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap = parseJobFilter(filter); 182 try { 183 return new CoordJobXCommand(jobId, filterMap, offset, length, desc).call(); 184 } 185 catch (CommandException ex) { 186 throw new BaseEngineException(ex); 187 } 188 } 189 190 /* 191 * (non-Javadoc) 192 * 193 * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String) 194 */ 195 @Override 196 public String getJobIdForExternalId(String externalId) throws CoordinatorEngineException { 197 return null; 198 } 199 200 /* 201 * (non-Javadoc) 202 * 203 * @see org.apache.oozie.BaseEngine#kill(java.lang.String) 204 */ 205 @Override 206 public void kill(String jobId) throws CoordinatorEngineException { 207 try { 208 new CoordKillXCommand(jobId).call(); 209 LOG.info("User " + user + " killed the Coordinator job " + jobId); 210 } 211 catch (CommandException e) { 212 throw new CoordinatorEngineException(e); 213 } 214 } 215 216 public CoordinatorActionInfo killActions(String jobId, String rangeType, String scope) throws CoordinatorEngineException { 217 try { 218 return new CoordActionsKillXCommand(jobId, rangeType, scope).call(); 219 } 220 catch (CommandException e) { 221 throw new CoordinatorEngineException(e); 222 } 223 } 224 225 /* (non-Javadoc) 226 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String) 227 */ 228 @Override 229 public void change(String jobId, String changeValue) throws CoordinatorEngineException { 230 try { 231 new CoordChangeXCommand(jobId, changeValue).call(); 232 LOG.info("User " + user + " changed the Coordinator job [" + jobId + "] to " + changeValue); 233 } 234 catch (CommandException e) { 235 throw new CoordinatorEngineException(e); 236 } 237 } 238 239 public CoordinatorActionInfo ignore(String jobId, String type, String scope) throws CoordinatorEngineException { 240 try { 241 LOG.info("User " + user + " ignore a Coordinator Action (s) [" + scope + "] of the Coordinator Job [" 242 + jobId + "]"); 243 return new CoordActionsIgnoreXCommand(jobId, type, scope).call(); 244 } 245 catch (CommandException e) { 246 throw new CoordinatorEngineException(e); 247 } 248 } 249 250 @Override 251 @Deprecated 252 public void reRun(String jobId, Configuration conf) throws BaseEngineException { 253 throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of rerun")); 254 } 255 256 /** 257 * Rerun coordinator actions for given rerunType 258 * 259 * @param jobId 260 * @param rerunType 261 * @param scope 262 * @param refresh 263 * @param noCleanup 264 * @throws BaseEngineException 265 */ 266 public CoordinatorActionInfo reRun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) 267 throws BaseEngineException { 268 try { 269 return new CoordRerunXCommand(jobId, rerunType, scope, refresh, 270 noCleanup).call(); 271 } 272 catch (CommandException ex) { 273 throw new BaseEngineException(ex); 274 } 275 } 276 277 /* 278 * (non-Javadoc) 279 * 280 * @see org.apache.oozie.BaseEngine#resume(java.lang.String) 281 */ 282 @Override 283 public void resume(String jobId) throws CoordinatorEngineException { 284 try { 285 new CoordResumeXCommand(jobId).call(); 286 } 287 catch (CommandException e) { 288 throw new CoordinatorEngineException(e); 289 } 290 } 291 292 @Override 293 @Deprecated 294 public void start(String jobId) throws BaseEngineException { 295 throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of start")); 296 } 297 298 /* 299 * (non-Javadoc) 300 * 301 * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String, 302 * java.io.Writer) 303 */ 304 @Override 305 public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, 306 BaseEngineException { 307 308 try { 309 XLogFilter filter = new XLogFilter(new XLogUserFilterParam(params)); 310 filter.setParameter(DagXLogInfoService.JOB, jobId); 311 Date lastTime = null; 312 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); 313 if (job.isTerminalStatus()) { 314 lastTime = job.getLastModifiedTime(); 315 } 316 if (lastTime == null) { 317 lastTime = new Date(); 318 } 319 Services.get().get(XLogStreamingService.class) 320 .streamLog(filter, job.getCreatedTime(), lastTime, writer, params); 321 } 322 catch (Exception e) { 323 throw new IOException(e); 324 } 325 } 326 327 /** 328 * Add list of actions to the filter based on conditions 329 * 330 * @param jobId Job Id 331 * @param logRetrievalScope Value for the retrieval type 332 * @param logRetrievalType Based on which filter criteria the log is retrieved 333 * @param writer writer to stream the log to 334 * @param params additional parameters from the request 335 * @throws IOException 336 * @throws BaseEngineException 337 * @throws CommandException 338 */ 339 public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer, 340 Map<String, String[]> params) throws IOException, BaseEngineException, CommandException { 341 342 Date startTime = null; 343 Date endTime = null; 344 XLogFilter filter = new XLogFilter(new XLogUserFilterParam(params)); 345 346 filter.setParameter(DagXLogInfoService.JOB, jobId); 347 if (logRetrievalScope != null && logRetrievalType != null) { 348 // if coordinator action logs are to be retrieved based on action id range 349 if (logRetrievalType.equals(RestConstants.JOB_LOG_ACTION)) { 350 // Use set implementation that maintains order or elements to achieve reproducibility: 351 Set<String> actionSet = new LinkedHashSet<String>(); 352 String[] list = logRetrievalScope.split(","); 353 for (String s : list) { 354 s = s.trim(); 355 if (s.contains("-")) { 356 String[] range = s.split("-"); 357 if (range.length != 2) { 358 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s 359 + "'"); 360 } 361 int start; 362 int end; 363 try { 364 start = Integer.parseInt(range[0].trim()); 365 } catch (NumberFormatException ne) { 366 throw new CommandException(ErrorCode.E0302, "could not parse " + range[0].trim() + "into an integer", 367 ne); 368 } 369 try { 370 end = Integer.parseInt(range[1].trim()); 371 } catch (NumberFormatException ne) { 372 throw new CommandException(ErrorCode.E0302, "could not parse " + range[1].trim() + "into an integer", 373 ne); 374 } 375 if (start > end) { 376 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'"); 377 } 378 for (int i = start; i <= end; i++) { 379 actionSet.add(jobId + "@" + i); 380 } 381 } 382 else { 383 try { 384 Integer.parseInt(s); 385 } 386 catch (NumberFormatException ne) { 387 throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s 388 + "'. Integer only."); 389 } 390 actionSet.add(jobId + "@" + s); 391 } 392 } 393 394 if (actionSet.size() >= maxNumActionsForLog) { 395 throw new CommandException(ErrorCode.E0302, 396 "Retrieving log of too many coordinator actions. Max count is " 397 + maxNumActionsForLog + " actions"); 398 } 399 Iterator<String> actionsIterator = actionSet.iterator(); 400 StringBuilder orSeparatedActions = new StringBuilder(""); 401 boolean orRequired = false; 402 while (actionsIterator.hasNext()) { 403 if (orRequired) { 404 orSeparatedActions.append("|"); 405 } 406 orSeparatedActions.append(actionsIterator.next().toString()); 407 orRequired = true; 408 } 409 if (actionSet.size() > 1 && orRequired) { 410 orSeparatedActions.insert(0, "("); 411 orSeparatedActions.append(")"); 412 } 413 414 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString()); 415 if (actionSet != null && actionSet.size() == 1) { 416 CoordinatorActionBean actionBean = getCoordAction(actionSet.iterator().next()); 417 startTime = actionBean.getCreatedTime(); 418 endTime = actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : actionBean 419 .getLastModifiedTime(); 420 filter.setActionList(true); 421 } 422 else if (actionSet != null && actionSet.size() > 0) { 423 List<String> tempList = new ArrayList<String>(actionSet); 424 Collections.sort(tempList, new Comparator<String>() { 425 public int compare(String a, String b) { 426 return Integer.valueOf(a.substring(a.lastIndexOf("@") + 1)).compareTo( 427 Integer.valueOf(b.substring(b.lastIndexOf("@") + 1))); 428 } 429 }); 430 startTime = getCoordAction(tempList.get(0)).getCreatedTime(); 431 endTime = CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, tempList.get(0), 432 tempList.get(tempList.size() - 1)); 433 filter.setActionList(true); 434 } 435 } 436 // if coordinator action logs are to be retrieved based on date range 437 // this block gets the corresponding list of coordinator actions to be used by the log filter 438 if (logRetrievalType.equalsIgnoreCase(RestConstants.JOB_LOG_DATE)) { 439 List<String> coordActionIdList = null; 440 try { 441 coordActionIdList = CoordActionsInDateRange.getCoordActionIdsFromDates(jobId, logRetrievalScope); 442 } 443 catch (XException xe) { 444 throw new CommandException(ErrorCode.E0302, "Error in date range for coordinator actions", xe); 445 } 446 if(coordActionIdList.size() >= maxNumActionsForLog) { 447 throw new CommandException(ErrorCode.E0302, 448 "Retrieving log of too many coordinator actions. Max count is " 449 + maxNumActionsForLog + " actions"); 450 } 451 StringBuilder orSeparatedActions = new StringBuilder(""); 452 boolean orRequired = false; 453 for (String coordActionId : coordActionIdList) { 454 if (orRequired) { 455 orSeparatedActions.append("|"); 456 } 457 orSeparatedActions.append(coordActionId); 458 orRequired = true; 459 } 460 if (coordActionIdList.size() > 1 && orRequired) { 461 orSeparatedActions.insert(0, "("); 462 orSeparatedActions.append(")"); 463 } 464 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString()); 465 if (coordActionIdList != null && coordActionIdList.size() == 1) { 466 CoordinatorActionBean actionBean = getCoordAction(coordActionIdList.get(0)); 467 startTime = actionBean.getCreatedTime(); 468 endTime = actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : actionBean 469 .getLastModifiedTime(); 470 filter.setActionList(true); 471 } 472 else if (coordActionIdList != null && coordActionIdList.size() > 0) { 473 Collections.sort(coordActionIdList, new Comparator<String>() { 474 public int compare(String a, String b) { 475 return Integer.valueOf(a.substring(a.lastIndexOf("@") + 1)).compareTo( 476 Integer.valueOf(b.substring(b.lastIndexOf("@") + 1))); 477 } 478 }); 479 startTime = getCoordAction(coordActionIdList.get(0)).getCreatedTime(); 480 endTime = CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, coordActionIdList.get(0), 481 coordActionIdList.get(coordActionIdList.size() - 1)); 482 filter.setActionList(true); 483 } 484 } 485 } 486 if (startTime == null || endTime == null) { 487 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); 488 if (startTime == null) { 489 startTime = job.getCreatedTime(); 490 } 491 if (endTime == null) { 492 if (job.isTerminalStatus()) { 493 endTime = job.getLastModifiedTime(); 494 } 495 if (endTime == null) { 496 endTime = new Date(); 497 } 498 } 499 } 500 Services.get().get(XLogStreamingService.class).streamLog(filter, startTime, endTime, writer, params); 501 } 502 503 /* 504 * (non-Javadoc) 505 * 506 * @see 507 * org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration 508 * , boolean) 509 */ 510 @Override 511 public String submitJob(Configuration conf, boolean startJob) throws CoordinatorEngineException { 512 try { 513 CoordSubmitXCommand submit = new CoordSubmitXCommand(conf); 514 return submit.call(); 515 } 516 catch (CommandException ex) { 517 throw new CoordinatorEngineException(ex); 518 } 519 } 520 521 /* 522 * (non-Javadoc) 523 * 524 * @see 525 * org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration) 526 */ 527 @Override 528 public String dryRunSubmit(Configuration conf) throws CoordinatorEngineException { 529 try { 530 CoordSubmitXCommand submit = new CoordSubmitXCommand(true, conf); 531 return submit.call(); 532 } 533 catch (CommandException ex) { 534 throw new CoordinatorEngineException(ex); 535 } 536 } 537 538 /* 539 * (non-Javadoc) 540 * 541 * @see org.apache.oozie.BaseEngine#suspend(java.lang.String) 542 */ 543 @Override 544 public void suspend(String jobId) throws CoordinatorEngineException { 545 try { 546 new CoordSuspendXCommand(jobId).call(); 547 } 548 catch (CommandException e) { 549 throw new CoordinatorEngineException(e); 550 } 551 552 } 553 554 /* 555 * (non-Javadoc) 556 * 557 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String) 558 */ 559 @Override 560 public WorkflowJob getJob(String jobId) throws BaseEngineException { 561 throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine")); 562 } 563 564 /* 565 * (non-Javadoc) 566 * 567 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int) 568 */ 569 @Override 570 public WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException { 571 throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine")); 572 } 573 574 private static final Set<String> FILTER_NAMES = new HashSet<String>(); 575 576 static { 577 FILTER_NAMES.add(OozieClient.FILTER_USER); 578 FILTER_NAMES.add(OozieClient.FILTER_NAME); 579 FILTER_NAMES.add(OozieClient.FILTER_GROUP); 580 FILTER_NAMES.add(OozieClient.FILTER_STATUS); 581 FILTER_NAMES.add(OozieClient.FILTER_ID); 582 FILTER_NAMES.add(OozieClient.FILTER_FREQUENCY); 583 FILTER_NAMES.add(OozieClient.FILTER_UNIT); 584 } 585 586 /** 587 * @param filter 588 * @param start 589 * @param len 590 * @return CoordinatorJobInfo 591 * @throws CoordinatorEngineException 592 */ 593 public CoordinatorJobInfo getCoordJobs(String filter, int start, int len) throws CoordinatorEngineException { 594 Map<String, List<String>> filterList = parseJobsFilter(filter); 595 596 try { 597 return new CoordJobsXCommand(filterList, start, len).call(); 598 } 599 catch (CommandException ex) { 600 throw new CoordinatorEngineException(ex); 601 } 602 } 603 604 // Parses the filter string (e.g status=RUNNING;status=WAITING) and returns a list of status values 605 public Map<Pair<String, FILTER_COMPARATORS>, List<Object>> parseJobFilter(String filter) throws 606 CoordinatorEngineException { 607 Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap = new HashMap<Pair<String, 608 FILTER_COMPARATORS>, List<Object>>(); 609 if (filter != null) { 610 //split name value pairs 611 StringTokenizer st = new StringTokenizer(filter, ";"); 612 while (st.hasMoreTokens()) { 613 String token = st.nextToken().trim(); 614 Pair<String, FILTER_COMPARATORS> pair = null; 615 for (FILTER_COMPARATORS comp : FILTER_COMPARATORS.values()) { 616 if (token.contains(comp.getSign())) { 617 int index = token.indexOf(comp.getSign()); 618 String key = token.substring(0, index); 619 String valueStr = token.substring(index + comp.getSign().length()); 620 Object value; 621 622 if (key.equalsIgnoreCase(OozieClient.FILTER_STATUS)) { 623 value = valueStr.toUpperCase(); 624 try { 625 CoordinatorAction.Status.valueOf((String) value); 626 } catch (IllegalArgumentException ex) { 627 // Check for incorrect status value 628 throw new CoordinatorEngineException(ErrorCode.E0421, filter, 629 XLog.format("invalid status value [{0}]." + " Valid status values are: [{1}]", 630 valueStr, StringUtils.join(CoordinatorAction.Status.values(), ", "))); 631 } 632 633 if (!(comp == FILTER_COMPARATORS.EQUALS || comp == FILTER_COMPARATORS.NOT_EQUALS)) { 634 throw new CoordinatorEngineException(ErrorCode.E0421, filter, 635 XLog.format("invalid comparator [{0}] for status." + " Valid are = and !=", 636 comp.getSign())); 637 } 638 639 pair = Pair.of(OozieClient.FILTER_STATUS, comp); 640 } else if (key.equalsIgnoreCase(OozieClient.FILTER_NOMINAL_TIME)) { 641 try { 642 value = new Timestamp(DateUtils.parseDateUTC(valueStr).getTime()); 643 } catch (ParseException e) { 644 throw new CoordinatorEngineException(ErrorCode.E0421, filter, 645 XLog.format("invalid nominal time [{0}]." + " Valid format: " + 646 "[{1}]", valueStr, DateUtils.ISO8601_UTC_MASK)); 647 } 648 pair = Pair.of(OozieClient.FILTER_NOMINAL_TIME, comp); 649 } else { 650 // Check for incorrect filter option 651 throw new CoordinatorEngineException(ErrorCode.E0421, filter, 652 XLog.format("invalid filter [{0}]." + " Valid filters [{1}]", key, StringUtils.join 653 (VALID_JOB_FILTERS, ", "))); 654 } 655 if (!filterMap.containsKey(pair)) { 656 filterMap.put(pair, new ArrayList<Object>()); 657 } 658 filterMap.get(pair).add(value); 659 break; 660 } 661 } 662 663 if (pair == null) { 664 //token doesn't contain comparator 665 throw new CoordinatorEngineException(ErrorCode.E0421, filter, 666 "filter should be of format <key><comparator><value> pairs"); 667 } 668 } 669 } 670 return filterMap; 671 } 672 673 /** 674 * @param filter 675 * @return Map<String, List<String>> 676 * @throws CoordinatorEngineException 677 */ 678 @VisibleForTesting 679 Map<String, List<String>> parseJobsFilter(String filter) throws CoordinatorEngineException { 680 Map<String, List<String>> map = new HashMap<String, List<String>>(); 681 boolean isTimeUnitSpecified = false; 682 String timeUnit = "MINUTE"; 683 boolean isFrequencySpecified = false; 684 String frequency = ""; 685 if (filter != null) { 686 StringTokenizer st = new StringTokenizer(filter, ";"); 687 while (st.hasMoreTokens()) { 688 String token = st.nextToken(); 689 if (token.contains("=")) { 690 String[] pair = token.split("="); 691 if (pair.length != 2) { 692 throw new CoordinatorEngineException(ErrorCode.E0420, filter, 693 "elements must be name=value pairs"); 694 } 695 if (!FILTER_NAMES.contains(pair[0].toLowerCase())) { 696 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]", 697 pair[0])); 698 } 699 if (pair[0].equalsIgnoreCase("frequency")) { 700 isFrequencySpecified = true; 701 try { 702 frequency = (int) Float.parseFloat(pair[1]) + ""; 703 continue; 704 } 705 catch (NumberFormatException NANException) { 706 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( 707 "invalid value [{0}] for frequency. A numerical value is expected", pair[1])); 708 } 709 } 710 if (pair[0].equalsIgnoreCase("unit")) { 711 isTimeUnitSpecified = true; 712 timeUnit = pair[1]; 713 if (!timeUnit.equalsIgnoreCase("months") && !timeUnit.equalsIgnoreCase("days") 714 && !timeUnit.equalsIgnoreCase("hours") && !timeUnit.equalsIgnoreCase("minutes")) { 715 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( 716 "invalid value [{0}] for time unit. " 717 + "Valid value is one of months, days, hours or minutes", pair[1])); 718 } 719 continue; 720 } 721 if (pair[0].equals("status")) { 722 try { 723 CoordinatorJob.Status.valueOf(pair[1]); 724 } 725 catch (IllegalArgumentException ex) { 726 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( 727 "invalid status [{0}]", pair[1])); 728 } 729 } 730 List<String> list = map.get(pair[0]); 731 if (list == null) { 732 list = new ArrayList<String>(); 733 map.put(pair[0], list); 734 } 735 list.add(pair[1]); 736 } else { 737 throw new CoordinatorEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs"); 738 } 739 } 740 // Unit is specified and frequency is not specified 741 if (!isFrequencySpecified && isTimeUnitSpecified) { 742 throw new CoordinatorEngineException(ErrorCode.E0420, filter, "time unit should be added only when " 743 + "frequency is specified. Either specify frequency also or else remove the time unit"); 744 } else if (isFrequencySpecified) { 745 // Frequency value is specified 746 if (isTimeUnitSpecified) { 747 if (timeUnit.equalsIgnoreCase("months")) { 748 timeUnit = "MONTH"; 749 } else if (timeUnit.equalsIgnoreCase("days")) { 750 timeUnit = "DAY"; 751 } else if (timeUnit.equalsIgnoreCase("hours")) { 752 // When job details are persisted to database, frequency in hours are converted to minutes. 753 // This conversion is to conform with that. 754 frequency = Integer.parseInt(frequency) * 60 + ""; 755 timeUnit = "MINUTE"; 756 } else if (timeUnit.equalsIgnoreCase("minutes")) { 757 timeUnit = "MINUTE"; 758 } 759 } 760 // Adding the frequency and time unit filters to the filter map 761 List<String> list = new ArrayList<String>(); 762 list.add(timeUnit); 763 map.put("unit", list); 764 list = new ArrayList<String>(); 765 list.add(frequency); 766 map.put("frequency", list); 767 } 768 } 769 return map; 770 } 771 772 public List<WorkflowJobBean> getReruns(String coordActionId) throws CoordinatorEngineException { 773 List<WorkflowJobBean> wfBeans; 774 try { 775 wfBeans = WorkflowJobQueryExecutor.getInstance().getList(WorkflowJobQuery.GET_WORKFLOWS_PARENT_COORD_RERUN, 776 coordActionId); 777 } 778 catch (JPAExecutorException e) { 779 throw new CoordinatorEngineException(e); 780 } 781 return wfBeans; 782 } 783 784 /** 785 * Update coord job definition. 786 * 787 * @param conf the conf 788 * @param jobId the job id 789 * @param dryrun the dryrun 790 * @param showDiff the show diff 791 * @return the string 792 * @throws CoordinatorEngineException the coordinator engine exception 793 */ 794 public String updateJob(Configuration conf, String jobId, boolean dryrun, boolean showDiff) 795 throws CoordinatorEngineException { 796 try { 797 CoordUpdateXCommand update = new CoordUpdateXCommand(dryrun, conf, jobId, showDiff); 798 return update.call(); 799 } 800 catch (CommandException ex) { 801 throw new CoordinatorEngineException(ex); 802 } 803 } 804}