001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie; 020 021import java.io.IOException; 022import java.io.Writer; 023import java.sql.Timestamp; 024import java.text.ParseException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.Comparator; 028import java.util.Date; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.LinkedHashSet; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036import java.util.StringTokenizer; 037 038import org.apache.commons.lang.StringUtils; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.oozie.client.CoordinatorAction; 041import org.apache.oozie.client.CoordinatorJob; 042import org.apache.oozie.client.OozieClient; 043import org.apache.oozie.client.WorkflowJob; 044import org.apache.oozie.client.rest.RestConstants; 045import org.apache.oozie.command.CommandException; 046import org.apache.oozie.command.OperationType; 047import org.apache.oozie.command.coord.BulkCoordXCommand; 048import org.apache.oozie.command.coord.CoordActionInfoXCommand; 049import org.apache.oozie.command.coord.CoordActionsIgnoreXCommand; 050import org.apache.oozie.command.coord.CoordActionsKillXCommand; 051import org.apache.oozie.command.coord.CoordChangeXCommand; 052import org.apache.oozie.command.coord.CoordActionMissingDependenciesXCommand; 053import org.apache.oozie.command.coord.CoordJobXCommand; 054import org.apache.oozie.command.coord.CoordJobsXCommand; 055import org.apache.oozie.command.coord.CoordKillXCommand; 056import org.apache.oozie.command.coord.CoordRerunXCommand; 057import org.apache.oozie.command.coord.CoordResumeXCommand; 058import org.apache.oozie.command.coord.CoordSLAAlertsDisableXCommand; 059import org.apache.oozie.command.coord.CoordSLAAlertsEnableXCommand; 060import org.apache.oozie.command.coord.CoordSLAChangeXCommand; 061import org.apache.oozie.command.coord.CoordSubmitXCommand; 062import org.apache.oozie.command.coord.CoordSuspendXCommand; 063import org.apache.oozie.command.coord.CoordUpdateXCommand; 064import org.apache.oozie.command.coord.CoordWfActionInfoXCommand; 065import org.apache.oozie.dependency.ActionDependency; 066import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 067import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 068import org.apache.oozie.executor.jpa.JPAExecutorException; 069import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 070import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 071import org.apache.oozie.service.DagXLogInfoService; 072import org.apache.oozie.service.Services; 073import org.apache.oozie.service.XLogStreamingService; 074import org.apache.oozie.util.CoordActionsInDateRange; 075import org.apache.oozie.util.DateUtils; 076import org.apache.oozie.util.JobUtils; 077import org.apache.oozie.util.Pair; 078import org.apache.oozie.util.ParamChecker; 079import org.apache.oozie.util.XLog; 080import org.apache.oozie.util.XLogFilter; 081import org.apache.oozie.util.XLogStreamer; 082import org.apache.oozie.util.XLogUserFilterParam; 083 084import com.google.common.annotations.VisibleForTesting; 085 086public class CoordinatorEngine extends BaseEngine { 087 private static final XLog LOG = XLog.getLog(CoordinatorEngine.class); 088 public final static String COORD_ACTIONS_LOG_MAX_COUNT = "oozie.coord.actions.log.max.count"; 089 private final static int COORD_ACTIONS_LOG_MAX_COUNT_DEFAULT = 50; 090 private final int maxNumActionsForLog; 091 092 public enum FILTER_COMPARATORS { 093 //This ordering is important, dont change this 094 GREATER_EQUAL(">="), GREATER(">"), LESSTHAN_EQUAL("<="), LESSTHAN("<"), NOT_EQUALS("!="), EQUALS("="); 095 096 private final String sign; 097 098 FILTER_COMPARATORS(String sign) { 099 this.sign = sign; 100 } 101 102 public String getSign() { 103 return sign; 104 } 105 } 106 107 public static final String[] VALID_JOB_FILTERS = {OozieClient.FILTER_STATUS, OozieClient.FILTER_NOMINAL_TIME}; 108 109 /** 110 * Create a system Coordinator engine, with no user and no group. 111 */ 112 public CoordinatorEngine() { 113 maxNumActionsForLog = Services.get().getConf() 114 .getInt(COORD_ACTIONS_LOG_MAX_COUNT, COORD_ACTIONS_LOG_MAX_COUNT_DEFAULT); 115 } 116 117 /** 118 * Create a Coordinator engine to perform operations on behave of a user. 119 * 120 * @param user user name. 121 */ 122 public CoordinatorEngine(String user) { 123 this(); 124 this.user = ParamChecker.notEmpty(user, "user"); 125 } 126 127 /* 128 * (non-Javadoc) 129 * 130 * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String) 131 */ 132 @Override 133 public String getDefinition(String jobId) throws BaseEngineException { 134 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); 135 return job.getOrigJobXml(); 136 } 137 138 /** 139 * @param jobId the job ID 140 * @return CoordinatorJobBean 141 * @throws BaseEngineException if the bean could not be retrieved 142 */ 143 private CoordinatorJobBean getCoordJobWithNoActionInfo(String jobId) throws BaseEngineException { 144 try { 145 return new CoordJobXCommand(jobId).call(); 146 } 147 catch (CommandException ex) { 148 throw new BaseEngineException(ex); 149 } 150 } 151 152 /** 153 * @param actionId the ID of the action 154 * @return CoordinatorActionBean 155 * @throws BaseEngineException if the bean could not be retrieved 156 */ 157 public CoordinatorActionBean getCoordAction(String actionId) throws BaseEngineException { 158 try { 159 return new CoordActionInfoXCommand(actionId).call(); 160 } 161 catch (CommandException ex) { 162 throw new BaseEngineException(ex); 163 } 164 } 165 166 /* 167 * (non-Javadoc) 168 * 169 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String) 170 */ 171 @Override 172 public CoordinatorJobBean getCoordJob(String jobId) throws BaseEngineException { 173 try { 174 return new CoordJobXCommand(jobId).call(); 175 } 176 catch (CommandException ex) { 177 throw new BaseEngineException(ex); 178 } 179 } 180 181 /* 182 * (non-Javadoc) 183 * 184 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, java.lang.String, int, int) 185 */ 186 @Override 187 public CoordinatorJobBean getCoordJob(String jobId, String filter, int offset, int length, boolean desc) 188 throws BaseEngineException { 189 Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap = parseJobFilter(filter); 190 try { 191 return new CoordJobXCommand(jobId, filterMap, offset, length, desc).call(); 192 } 193 catch (CommandException ex) { 194 throw new BaseEngineException(ex); 195 } 196 } 197 198 /* 199 * (non-Javadoc) 200 * 201 * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String) 202 */ 203 @Override 204 public String getJobIdForExternalId(String externalId) throws CoordinatorEngineException { 205 return null; 206 } 207 208 /* 209 * (non-Javadoc) 210 * 211 * @see org.apache.oozie.BaseEngine#kill(java.lang.String) 212 */ 213 @Override 214 public void kill(String jobId) throws CoordinatorEngineException { 215 try { 216 new CoordKillXCommand(jobId).call(); 217 LOG.info("User " + user + " killed the Coordinator job " + jobId); 218 } 219 catch (CommandException e) { 220 throw new CoordinatorEngineException(e); 221 } 222 } 223 224 public CoordinatorActionInfo killActions(String jobId, String rangeType, String scope) throws CoordinatorEngineException { 225 try { 226 return new CoordActionsKillXCommand(jobId, rangeType, scope).call(); 227 } 228 catch (CommandException e) { 229 throw new CoordinatorEngineException(e); 230 } 231 } 232 233 /* (non-Javadoc) 234 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String) 235 */ 236 @Override 237 public void change(String jobId, String changeValue) throws CoordinatorEngineException { 238 try { 239 new CoordChangeXCommand(jobId, changeValue).call(); 240 LOG.info("User " + user + " changed the Coordinator job [" + jobId + "] to " + changeValue); 241 } 242 catch (CommandException e) { 243 throw new CoordinatorEngineException(e); 244 } 245 } 246 247 public CoordinatorActionInfo ignore(String jobId, String type, String scope) throws CoordinatorEngineException { 248 try { 249 LOG.info("User " + user + " ignore a Coordinator Action (s) [" + scope + "] of the Coordinator Job [" 250 + jobId + "]"); 251 return new CoordActionsIgnoreXCommand(jobId, type, scope).call(); 252 } 253 catch (CommandException e) { 254 throw new CoordinatorEngineException(e); 255 } 256 } 257 258 @Override 259 @Deprecated 260 public void reRun(String jobId, Configuration conf) throws BaseEngineException { 261 throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of rerun")); 262 } 263 264 /** 265 * Rerun coordinator actions for given rerunType 266 * 267 * @param jobId the job ID 268 * @param rerunType rerun type {@link RestConstants#JOB_COORD_SCOPE_DATE} or {@link RestConstants#JOB_COORD_SCOPE_ACTION} 269 * @param scope the rerun scope for given rerunType separated by "," 270 * @param refresh true if user wants to refresh input/output dataset urls 271 * @param noCleanup false if user wants to cleanup output events for given rerun actions 272 * @param failed true if user wants to rerun only failed nodes 273 * @param conf configuration values for actions 274 * @return the action info 275 * @throws BaseEngineException thrown if the actions could not be rerun 276 */ 277 public CoordinatorActionInfo reRun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup, 278 boolean failed, Configuration conf) 279 throws BaseEngineException { 280 try { 281 return new CoordRerunXCommand(jobId, rerunType, scope, refresh, 282 noCleanup, failed, conf).call(); 283 } 284 catch (CommandException ex) { 285 throw new BaseEngineException(ex); 286 } 287 } 288 289 /* 290 * (non-Javadoc) 291 * 292 * @see org.apache.oozie.BaseEngine#resume(java.lang.String) 293 */ 294 @Override 295 public void resume(String jobId) throws CoordinatorEngineException { 296 try { 297 new CoordResumeXCommand(jobId).call(); 298 } 299 catch (CommandException e) { 300 throw new CoordinatorEngineException(e); 301 } 302 } 303 304 @Override 305 @Deprecated 306 public void start(String jobId) throws BaseEngineException { 307 throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of start")); 308 } 309 310 311 @Override 312 protected void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer) 313 throws IOException, BaseEngineException { 314 logStreamer.getXLogFilter().setParameter(DagXLogInfoService.JOB, jobId); 315 Date lastTime = null; 316 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); 317 if (job.isTerminalStatus()) { 318 lastTime = job.getLastModifiedTime(); 319 } 320 if (lastTime == null) { 321 lastTime = new Date(); 322 } 323 Services.get().get(XLogStreamingService.class).streamLog(logStreamer, job.getCreatedTime(), lastTime, writer); 324 } 325 326 /** 327 * Add list of actions to the filter based on conditions 328 * 329 * @param jobId Job Id 330 * @param logRetrievalScope Value for the retrieval type 331 * @param logRetrievalType Based on which filter criteria the log is retrieved 332 * @param writer writer to stream the log to 333 * @param requestParameters additional parameters from the request 334 * @throws IOException in case of IO error 335 * @throws BaseEngineException if there is an error during streaming 336 * @throws CommandException if a parameter could not be parsed 337 */ 338 public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer, 339 Map<String, String[]> requestParameters) throws IOException, BaseEngineException, CommandException { 340 341 Date startTime = null; 342 Date endTime = null; 343 XLogFilter filter = new XLogFilter(new XLogUserFilterParam(requestParameters)); 344 345 filter.setParameter(DagXLogInfoService.JOB, jobId); 346 if (logRetrievalScope != null && logRetrievalType != null) { 347 // if coordinator action logs are to be retrieved based on action id range 348 if (logRetrievalType.equals(RestConstants.JOB_LOG_ACTION)) { 349 // Use set implementation that maintains order or elements to achieve reproducibility: 350 Set<String> actionSet = new LinkedHashSet<String>(); 351 String[] list = logRetrievalScope.split(","); 352 for (String s : list) { 353 s = s.trim(); 354 if (s.contains("-")) { 355 String[] range = s.split("-"); 356 if (range.length != 2) { 357 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s 358 + "'"); 359 } 360 int start; 361 int end; 362 try { 363 start = Integer.parseInt(range[0].trim()); 364 } catch (NumberFormatException ne) { 365 throw new CommandException(ErrorCode.E0302, "could not parse " + range[0].trim() + "into an integer", 366 ne); 367 } 368 try { 369 end = Integer.parseInt(range[1].trim()); 370 } catch (NumberFormatException ne) { 371 throw new CommandException(ErrorCode.E0302, "could not parse " + range[1].trim() + "into an integer", 372 ne); 373 } 374 if (start > end) { 375 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'"); 376 } 377 for (int i = start; i <= end; i++) { 378 actionSet.add(jobId + "@" + i); 379 } 380 } 381 else { 382 try { 383 Integer.parseInt(s); 384 } 385 catch (NumberFormatException ne) { 386 throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s 387 + "'. Integer only."); 388 } 389 actionSet.add(jobId + "@" + s); 390 } 391 } 392 393 if (actionSet.size() >= maxNumActionsForLog) { 394 throw new CommandException(ErrorCode.E0302, 395 "Retrieving log of too many coordinator actions. Max count is " 396 + maxNumActionsForLog + " actions"); 397 } 398 Iterator<String> actionsIterator = actionSet.iterator(); 399 StringBuilder orSeparatedActions = new StringBuilder(""); 400 boolean orRequired = false; 401 while (actionsIterator.hasNext()) { 402 if (orRequired) { 403 orSeparatedActions.append("|"); 404 } 405 orSeparatedActions.append(actionsIterator.next().toString()); 406 orRequired = true; 407 } 408 if (actionSet.size() > 1 && orRequired) { 409 orSeparatedActions.insert(0, "("); 410 orSeparatedActions.append(")"); 411 } 412 413 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString()); 414 if (actionSet != null && actionSet.size() == 1) { 415 CoordinatorActionBean actionBean = getCoordAction(actionSet.iterator().next()); 416 startTime = actionBean.getCreatedTime(); 417 endTime = actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : actionBean 418 .getLastModifiedTime(); 419 filter.setActionList(true); 420 } 421 else if (actionSet != null && actionSet.size() > 0) { 422 List<String> tempList = new ArrayList<String>(actionSet); 423 Collections.sort(tempList, new Comparator<String>() { 424 public int compare(String a, String b) { 425 return Integer.valueOf(a.substring(a.lastIndexOf("@") + 1)).compareTo( 426 Integer.valueOf(b.substring(b.lastIndexOf("@") + 1))); 427 } 428 }); 429 startTime = getCoordAction(tempList.get(0)).getCreatedTime(); 430 endTime = CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, tempList.get(0), 431 tempList.get(tempList.size() - 1)); 432 filter.setActionList(true); 433 } 434 } 435 // if coordinator action logs are to be retrieved based on date range 436 // this block gets the corresponding list of coordinator actions to be used by the log filter 437 if (logRetrievalType.equalsIgnoreCase(RestConstants.JOB_LOG_DATE)) { 438 List<String> coordActionIdList = null; 439 try { 440 coordActionIdList = CoordActionsInDateRange.getCoordActionIdsFromDates(jobId, logRetrievalScope); 441 } 442 catch (XException xe) { 443 throw new CommandException(ErrorCode.E0302, "Error in date range for coordinator actions", xe); 444 } 445 if(coordActionIdList.size() >= maxNumActionsForLog) { 446 throw new CommandException(ErrorCode.E0302, 447 "Retrieving log of too many coordinator actions. Max count is " 448 + maxNumActionsForLog + " actions"); 449 } 450 StringBuilder orSeparatedActions = new StringBuilder(""); 451 boolean orRequired = false; 452 for (String coordActionId : coordActionIdList) { 453 if (orRequired) { 454 orSeparatedActions.append("|"); 455 } 456 orSeparatedActions.append(coordActionId); 457 orRequired = true; 458 } 459 if (coordActionIdList.size() > 1 && orRequired) { 460 orSeparatedActions.insert(0, "("); 461 orSeparatedActions.append(")"); 462 } 463 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString()); 464 if (coordActionIdList != null && coordActionIdList.size() == 1) { 465 CoordinatorActionBean actionBean = getCoordAction(coordActionIdList.get(0)); 466 startTime = actionBean.getCreatedTime(); 467 endTime = actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : actionBean 468 .getLastModifiedTime(); 469 filter.setActionList(true); 470 } 471 else if (coordActionIdList != null && coordActionIdList.size() > 0) { 472 Collections.sort(coordActionIdList, new Comparator<String>() { 473 public int compare(String a, String b) { 474 return Integer.valueOf(a.substring(a.lastIndexOf("@") + 1)).compareTo( 475 Integer.valueOf(b.substring(b.lastIndexOf("@") + 1))); 476 } 477 }); 478 startTime = getCoordAction(coordActionIdList.get(0)).getCreatedTime(); 479 endTime = CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, coordActionIdList.get(0), 480 coordActionIdList.get(coordActionIdList.size() - 1)); 481 filter.setActionList(true); 482 } 483 } 484 } 485 if (startTime == null || endTime == null) { 486 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); 487 if (startTime == null) { 488 startTime = job.getCreatedTime(); 489 } 490 if (endTime == null) { 491 if (job.isTerminalStatus()) { 492 endTime = job.getLastModifiedTime(); 493 } 494 if (endTime == null) { 495 endTime = new Date(); 496 } 497 } 498 } 499 Services.get().get(XLogStreamingService.class).streamLog(new XLogStreamer(filter, requestParameters), startTime, 500 endTime, writer); 501 } 502 503 /* 504 * (non-Javadoc) 505 * 506 * @see 507 * org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration 508 * , boolean) 509 */ 510 @Override 511 public String submitJob(Configuration conf, boolean startJob) throws CoordinatorEngineException { 512 try { 513 CoordSubmitXCommand submit = new CoordSubmitXCommand(conf); 514 return submit.call(); 515 } 516 catch (CommandException ex) { 517 throw new CoordinatorEngineException(ex); 518 } 519 } 520 521 /* 522 * (non-Javadoc) 523 * 524 * @see 525 * org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration) 526 */ 527 @Override 528 public String dryRunSubmit(Configuration conf) throws CoordinatorEngineException { 529 try { 530 CoordSubmitXCommand submit = new CoordSubmitXCommand(true, conf); 531 return submit.call(); 532 } 533 catch (CommandException ex) { 534 throw new CoordinatorEngineException(ex); 535 } 536 } 537 538 /* 539 * (non-Javadoc) 540 * 541 * @see org.apache.oozie.BaseEngine#suspend(java.lang.String) 542 */ 543 @Override 544 public void suspend(String jobId) throws CoordinatorEngineException { 545 try { 546 new CoordSuspendXCommand(jobId).call(); 547 } 548 catch (CommandException e) { 549 throw new CoordinatorEngineException(e); 550 } 551 552 } 553 554 /* 555 * (non-Javadoc) 556 * 557 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String) 558 */ 559 @Override 560 public WorkflowJob getJob(String jobId) throws BaseEngineException { 561 throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine")); 562 } 563 564 /* 565 * (non-Javadoc) 566 * 567 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int) 568 */ 569 @Override 570 public WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException { 571 throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine")); 572 } 573 574 private static final Set<String> FILTER_NAMES = new HashSet<String>(); 575 576 static { 577 FILTER_NAMES.add(OozieClient.FILTER_USER); 578 FILTER_NAMES.add(OozieClient.FILTER_NAME); 579 FILTER_NAMES.add(OozieClient.FILTER_GROUP); 580 FILTER_NAMES.add(OozieClient.FILTER_STATUS); 581 FILTER_NAMES.add(OozieClient.FILTER_ID); 582 FILTER_NAMES.add(OozieClient.FILTER_FREQUENCY); 583 FILTER_NAMES.add(OozieClient.FILTER_UNIT); 584 FILTER_NAMES.add(OozieClient.FILTER_SORT_BY); 585 FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_START); 586 FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_END); 587 FILTER_NAMES.add(OozieClient.FILTER_TEXT); 588 } 589 590 /** 591 * @param filter he filter to parse. Elements must be semicolon-separated name=value pairs. 592 * Supported names are in{@link CoordinatorEngine#FILTER_NAMES}. 593 * @param start start from this job in the coordinator 594 * @param len maximum number of results 595 * @return CoordinatorJobInfo 596 * @throws CoordinatorEngineException if the job info could no be retrieved 597 */ 598 public CoordinatorJobInfo getCoordJobs(String filter, int start, int len) throws CoordinatorEngineException { 599 Map<String, List<String>> filterList = parseJobsFilter(filter); 600 601 try { 602 return new CoordJobsXCommand(filterList, start, len).call(); 603 } 604 catch (CommandException ex) { 605 throw new CoordinatorEngineException(ex); 606 } 607 } 608 609 // Parses the filter string (e.g status=RUNNING;status=WAITING) and returns a list of status values 610 public Map<Pair<String, FILTER_COMPARATORS>, List<Object>> parseJobFilter(String filter) throws 611 CoordinatorEngineException { 612 Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap = new HashMap<Pair<String, 613 FILTER_COMPARATORS>, List<Object>>(); 614 if (filter != null) { 615 //split name value pairs 616 StringTokenizer st = new StringTokenizer(filter, ";"); 617 while (st.hasMoreTokens()) { 618 String token = st.nextToken().trim(); 619 Pair<String, FILTER_COMPARATORS> pair = null; 620 for (FILTER_COMPARATORS comp : FILTER_COMPARATORS.values()) { 621 if (token.contains(comp.getSign())) { 622 int index = token.indexOf(comp.getSign()); 623 String key = token.substring(0, index); 624 String valueStr = token.substring(index + comp.getSign().length()); 625 Object value; 626 627 if (key.equalsIgnoreCase(OozieClient.FILTER_STATUS)) { 628 value = valueStr.toUpperCase(); 629 try { 630 CoordinatorAction.Status.valueOf((String) value); 631 } catch (IllegalArgumentException ex) { 632 // Check for incorrect status value 633 throw new CoordinatorEngineException(ErrorCode.E0421, filter, 634 XLog.format("invalid status value [{0}]." + " Valid status values are: [{1}]", 635 valueStr, StringUtils.join(CoordinatorAction.Status.values(), ", "))); 636 } 637 638 if (!(comp == FILTER_COMPARATORS.EQUALS || comp == FILTER_COMPARATORS.NOT_EQUALS)) { 639 throw new CoordinatorEngineException(ErrorCode.E0421, filter, 640 XLog.format("invalid comparator [{0}] for status." + " Valid are = and !=", 641 comp.getSign())); 642 } 643 644 pair = Pair.of(OozieClient.FILTER_STATUS, comp); 645 } else if (key.equalsIgnoreCase(OozieClient.FILTER_NOMINAL_TIME)) { 646 try { 647 value = new Timestamp(DateUtils.parseDateUTC(valueStr).getTime()); 648 } catch (ParseException e) { 649 throw new CoordinatorEngineException(ErrorCode.E0421, filter, 650 XLog.format("invalid nominal time [{0}]." + " Valid format: " + 651 "[{1}]", valueStr, DateUtils.ISO8601_UTC_MASK)); 652 } 653 pair = Pair.of(OozieClient.FILTER_NOMINAL_TIME, comp); 654 } else { 655 // Check for incorrect filter option 656 throw new CoordinatorEngineException(ErrorCode.E0421, filter, 657 XLog.format("invalid filter [{0}]." + " Valid filters [{1}]", key, StringUtils.join 658 (VALID_JOB_FILTERS, ", "))); 659 } 660 if (!filterMap.containsKey(pair)) { 661 filterMap.put(pair, new ArrayList<Object>()); 662 } 663 filterMap.get(pair).add(value); 664 break; 665 } 666 } 667 668 if (pair == null) { 669 //token doesn't contain comparator 670 throw new CoordinatorEngineException(ErrorCode.E0421, filter, 671 "filter should be of format <key><comparator><value> pairs"); 672 } 673 } 674 } 675 return filterMap; 676 } 677 678 /** 679 * @param filter the filter to parse. Elements must be semicolon-separated name=value pairs. 680 * Supported names are in{@link CoordinatorEngine#FILTER_NAMES}. 681 * @return Map<String, List<String>> map of parsed filters 682 * @throws CoordinatorEngineException if the parameter could not be parsed 683 */ 684 @VisibleForTesting 685 Map<String, List<String>> parseJobsFilter(String filter) throws CoordinatorEngineException { 686 Map<String, List<String>> map = new HashMap<String, List<String>>(); 687 boolean isTimeUnitSpecified = false; 688 String timeUnit = "MINUTE"; 689 boolean isFrequencySpecified = false; 690 String frequency = ""; 691 if (filter != null) { 692 StringTokenizer st = new StringTokenizer(filter, ";"); 693 while (st.hasMoreTokens()) { 694 String token = st.nextToken(); 695 if (token.contains("=")) { 696 String[] pair = token.split("="); 697 if (pair.length != 2) { 698 throw new CoordinatorEngineException(ErrorCode.E0420, filter, 699 "elements must be semicolon-separated name=value pairs"); 700 } 701 pair[0] = pair[0].toLowerCase(); 702 if (!FILTER_NAMES.contains(pair[0])) { 703 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]", 704 pair[0])); 705 } 706 if (pair[0].equalsIgnoreCase("frequency")) { 707 isFrequencySpecified = true; 708 try { 709 frequency = (int) Float.parseFloat(pair[1]) + ""; 710 continue; 711 } 712 catch (NumberFormatException NANException) { 713 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( 714 "invalid value [{0}] for frequency. A numerical value is expected", pair[1])); 715 } 716 } 717 if (pair[0].equalsIgnoreCase("unit")) { 718 isTimeUnitSpecified = true; 719 timeUnit = pair[1]; 720 if (!timeUnit.equalsIgnoreCase("months") && !timeUnit.equalsIgnoreCase("days") 721 && !timeUnit.equalsIgnoreCase("hours") && !timeUnit.equalsIgnoreCase("minutes")) { 722 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( 723 "invalid value [{0}] for time unit. " 724 + "Valid value is one of months, days, hours or minutes", pair[1])); 725 } 726 continue; 727 } 728 if (pair[0].equals("status")) { 729 try { 730 CoordinatorJob.Status.valueOf(pair[1]); 731 } 732 catch (IllegalArgumentException ex) { 733 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( 734 "invalid status [{0}]", pair[1])); 735 } 736 } 737 List<String> list = map.get(pair[0]); 738 if (list == null) { 739 list = new ArrayList<String>(); 740 map.put(pair[0], list); 741 } 742 list.add(pair[1]); 743 } else { 744 throw new CoordinatorEngineException(ErrorCode.E0420, filter, 745 "elements must be semicolon-separated name=value pairs"); 746 } 747 } 748 // Unit is specified and frequency is not specified 749 if (!isFrequencySpecified && isTimeUnitSpecified) { 750 throw new CoordinatorEngineException(ErrorCode.E0420, filter, "time unit should be added only when " 751 + "frequency is specified. Either specify frequency also or else remove the time unit"); 752 } else if (isFrequencySpecified) { 753 // Frequency value is specified 754 if (isTimeUnitSpecified) { 755 if (timeUnit.equalsIgnoreCase("months")) { 756 timeUnit = "MONTH"; 757 } else if (timeUnit.equalsIgnoreCase("days")) { 758 timeUnit = "DAY"; 759 } else if (timeUnit.equalsIgnoreCase("hours")) { 760 // When job details are persisted to database, frequency in hours are converted to minutes. 761 // This conversion is to conform with that. 762 frequency = Integer.parseInt(frequency) * 60 + ""; 763 timeUnit = "MINUTE"; 764 } else if (timeUnit.equalsIgnoreCase("minutes")) { 765 timeUnit = "MINUTE"; 766 } 767 } 768 // Adding the frequency and time unit filters to the filter map 769 List<String> list = new ArrayList<String>(); 770 list.add(timeUnit); 771 map.put("unit", list); 772 list = new ArrayList<String>(); 773 list.add(frequency); 774 map.put("frequency", list); 775 } 776 } 777 return map; 778 } 779 780 public List<WorkflowJobBean> getReruns(String coordActionId) throws CoordinatorEngineException { 781 List<WorkflowJobBean> wfBeans; 782 try { 783 wfBeans = WorkflowJobQueryExecutor.getInstance().getList(WorkflowJobQuery.GET_WORKFLOWS_PARENT_COORD_RERUN, 784 coordActionId); 785 } 786 catch (JPAExecutorException e) { 787 throw new CoordinatorEngineException(e); 788 } 789 return wfBeans; 790 } 791 792 /** 793 * Update coord job definition. 794 * 795 * @param conf the conf 796 * @param jobId the job id 797 * @param dryrun the dryrun 798 * @param showDiff the show diff 799 * @return the string 800 * @throws CoordinatorEngineException the coordinator engine exception 801 */ 802 public String updateJob(Configuration conf, String jobId, boolean dryrun, boolean showDiff) 803 throws CoordinatorEngineException { 804 try { 805 CoordUpdateXCommand update = new CoordUpdateXCommand(dryrun, conf, jobId, showDiff); 806 return update.call(); 807 } 808 catch (CommandException ex) { 809 throw new CoordinatorEngineException(ex); 810 } 811 } 812 813 /** 814 * Return the status for a Job ID 815 * 816 * @param jobId job Id. 817 * @return the job's status 818 * @throws CoordinatorEngineException thrown if the job's status could not be obtained 819 */ 820 @Override 821 public String getJobStatus(String jobId) throws CoordinatorEngineException { 822 try { 823 CoordinatorJobBean coordJob = CoordJobQueryExecutor.getInstance().get( 824 CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB_STATUS, jobId); 825 return coordJob.getStatusStr(); 826 } 827 catch (JPAExecutorException e) { 828 throw new CoordinatorEngineException(e); 829 } 830 } 831 832 /** 833 * Return the status for an Action ID 834 * 835 * @param actionId action Id. 836 * @return the action's status 837 * @throws CoordinatorEngineException thrown if the action's status could not be obtained 838 */ 839 public String getActionStatus(String actionId) throws CoordinatorEngineException { 840 try { 841 CoordinatorActionBean coordAction = CoordActionQueryExecutor.getInstance().get( 842 CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION_STATUS, actionId); 843 return coordAction.getStatusStr(); 844 } 845 catch (JPAExecutorException e) { 846 throw new CoordinatorEngineException(e); 847 } 848 } 849 850 @Override 851 public void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException { 852 try { 853 new CoordSLAAlertsDisableXCommand(id, actions, dates).call(); 854 855 } 856 catch (CommandException e) { 857 throw new CoordinatorEngineException(e); 858 } 859 } 860 861 @Override 862 public void changeSLA(String id, String actions, String dates, String childIds, String newParams) 863 throws BaseEngineException { 864 Map<String, String> slaNewParams = null; 865 866 try { 867 868 if (newParams != null) { 869 slaNewParams = JobUtils.parseChangeValue(newParams); 870 } 871 872 new CoordSLAChangeXCommand(id, actions, dates, slaNewParams).call(); 873 874 } 875 catch (CommandException e) { 876 throw new CoordinatorEngineException(e); 877 } 878 } 879 880 @Override 881 public void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException { 882 try { 883 new CoordSLAAlertsEnableXCommand(id, actions, dates).call(); 884 885 } 886 catch (CommandException e) { 887 throw new CoordinatorEngineException(e); 888 } 889 } 890 891 /** 892 * return a list of killed Coordinator job 893 * 894 * @param filter the filter string for which the coordinator jobs are killed 895 * @param start the starting index for coordinator jobs 896 * @param length maximum number of jobs to be killed 897 * @return coordinatorJobInfo the list of jobs being killed 898 * @throws CoordinatorEngineException thrown if one or more of the jobs cannot be killed 899 */ 900 public CoordinatorJobInfo killJobs(String filter, int start, int length) throws CoordinatorEngineException { 901 try { 902 Map<String, List<String>> filterMap = parseJobsFilter(filter); 903 CoordinatorJobInfo coordinatorJobInfo = 904 new BulkCoordXCommand(filterMap, start, length, OperationType.Kill).call(); 905 if (coordinatorJobInfo == null) { 906 return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0); 907 } 908 return coordinatorJobInfo; 909 } 910 catch (CommandException ex) { 911 throw new CoordinatorEngineException(ex); 912 } 913 } 914 915 /** 916 * return the jobs that've been suspended 917 * @param filter Filter for jobs that will be suspended, can be name, user, group, status, id or combination of any 918 * @param start Offset for the jobs that will be suspended 919 * @param length maximum number of jobs that will be suspended 920 * @return coordinatorJobInfo 921 * @throws CoordinatorEngineException if the jobs could not be suspended 922 */ 923 public CoordinatorJobInfo suspendJobs(String filter, int start, int length) throws CoordinatorEngineException { 924 try { 925 Map<String, List<String>> filterMap = parseJobsFilter(filter); 926 CoordinatorJobInfo coordinatorJobInfo = 927 new BulkCoordXCommand(filterMap, start, length, OperationType.Suspend).call(); 928 if (coordinatorJobInfo == null) { 929 return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0); 930 } 931 return coordinatorJobInfo; 932 } 933 catch (CommandException ex) { 934 throw new CoordinatorEngineException(ex); 935 } 936 } 937 938 /** 939 * return the jobs that've been resumed 940 * @param filter Filter for jobs that will be resumed, can be name, user, group, status, id or combination of any 941 * @param start Offset for the jobs that will be resumed 942 * @param length maximum number of jobs that will be resumed 943 * @return coordinatorJobInfo returns resumed jobs 944 * @throws CoordinatorEngineException if the jobs could not be resumed 945 */ 946 public CoordinatorJobInfo resumeJobs(String filter, int start, int length) throws CoordinatorEngineException { 947 try { 948 Map<String, List<String>> filterMap = parseJobsFilter(filter); 949 CoordinatorJobInfo coordinatorJobInfo = 950 new BulkCoordXCommand(filterMap, start, length, OperationType.Resume).call(); 951 if (coordinatorJobInfo == null) { 952 return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0); 953 } 954 return coordinatorJobInfo; 955 } 956 catch (CommandException ex) { 957 throw new CoordinatorEngineException(ex); 958 } 959 } 960 /** 961 * Get coord action missing dependencies 962 * @param id jobID 963 * @param actions action list 964 * @param dates nominal time list 965 * @return CoordActionMissingDependenciesXCommand pair of coord action bean and 966 * list of missing input dependencies. 967 * @throws CommandException if the actions could not be retrieved 968 */ 969 public List<Pair<CoordinatorActionBean, Map<String, ActionDependency>>> getCoordActionMissingDependencies(String id, 970 String actions, String dates) throws CommandException { 971 return new CoordActionMissingDependenciesXCommand(id, actions, dates).call(); 972 } 973 974 /** 975 * get wf actions by action name in a coordinator job 976 * @param jobId coordinator job id 977 * @param wfActionName workflow action name 978 * @param offset offset in the coordinator job 979 * @param len maximum number of results 980 * @return CoordWfActionInfoXCommand list of CoordinatorWfActionBean in a coordinator 981 * @throws CoordinatorEngineException if the actions could not be retrieved 982 */ 983 public List<CoordinatorWfActionBean> getWfActionByJobIdAndName(String jobId, String wfActionName, int offset, int len) 984 throws CoordinatorEngineException { 985 try { 986 return new CoordWfActionInfoXCommand(jobId, wfActionName, offset, len).call(); 987 } 988 catch (CommandException ex) { 989 throw new CoordinatorEngineException(ex); 990 } 991 } 992}