001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie; 020 021import java.io.IOException; 022import java.io.Writer; 023import java.sql.Timestamp; 024import java.text.ParseException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.Comparator; 028import java.util.Date; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.LinkedHashSet; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036import java.util.StringTokenizer; 037 038import org.apache.commons.lang.StringUtils; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.oozie.client.CoordinatorAction; 041import org.apache.oozie.client.CoordinatorJob; 042import org.apache.oozie.client.OozieClient; 043import org.apache.oozie.client.WorkflowJob; 044import org.apache.oozie.client.rest.RestConstants; 045import org.apache.oozie.command.CommandException; 046import org.apache.oozie.command.OperationType; 047import org.apache.oozie.command.coord.BulkCoordXCommand; 048import org.apache.oozie.command.coord.CoordActionInfoXCommand; 049import org.apache.oozie.command.coord.CoordActionsIgnoreXCommand; 050import org.apache.oozie.command.coord.CoordActionsKillXCommand; 051import org.apache.oozie.command.coord.CoordChangeXCommand; 052import org.apache.oozie.command.coord.CoordJobXCommand; 053import org.apache.oozie.command.coord.CoordJobsXCommand; 054import org.apache.oozie.command.coord.CoordKillXCommand; 055import org.apache.oozie.command.coord.CoordRerunXCommand; 056import org.apache.oozie.command.coord.CoordResumeXCommand; 057import org.apache.oozie.command.coord.CoordSLAAlertsDisableXCommand; 058import org.apache.oozie.command.coord.CoordSLAAlertsEnableXCommand; 059import org.apache.oozie.command.coord.CoordSLAChangeXCommand; 060import org.apache.oozie.command.coord.CoordSubmitXCommand; 061import org.apache.oozie.command.coord.CoordSuspendXCommand; 062import org.apache.oozie.command.coord.CoordUpdateXCommand; 063import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 064import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 065import org.apache.oozie.executor.jpa.JPAExecutorException; 066import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 067import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 068import org.apache.oozie.service.DagXLogInfoService; 069import org.apache.oozie.service.Services; 070import org.apache.oozie.service.XLogStreamingService; 071import org.apache.oozie.util.CoordActionsInDateRange; 072import org.apache.oozie.util.DateUtils; 073import org.apache.oozie.util.JobUtils; 074import org.apache.oozie.util.Pair; 075import org.apache.oozie.util.ParamChecker; 076import org.apache.oozie.util.XLog; 077import org.apache.oozie.util.XLogAuditFilter; 078import org.apache.oozie.util.XLogFilter; 079import org.apache.oozie.util.XLogUserFilterParam; 080 081import com.google.common.annotations.VisibleForTesting; 082 083public class CoordinatorEngine extends BaseEngine { 084 private static final XLog LOG = XLog.getLog(CoordinatorEngine.class); 085 public final static String COORD_ACTIONS_LOG_MAX_COUNT = "oozie.coord.actions.log.max.count"; 086 private final static int COORD_ACTIONS_LOG_MAX_COUNT_DEFAULT = 50; 087 private final int maxNumActionsForLog; 088 089 public enum FILTER_COMPARATORS { 090 //This ordering is important, dont change this 091 GREATER_EQUAL(">="), GREATER(">"), LESSTHAN_EQUAL("<="), LESSTHAN("<"), NOT_EQUALS("!="), EQUALS("="); 092 093 private final String sign; 094 095 FILTER_COMPARATORS(String sign) { 096 this.sign = sign; 097 } 098 099 public String getSign() { 100 return sign; 101 } 102 } 103 104 public static final String[] VALID_JOB_FILTERS = {OozieClient.FILTER_STATUS, OozieClient.FILTER_NOMINAL_TIME}; 105 106 /** 107 * Create a system Coordinator engine, with no user and no group. 108 */ 109 public CoordinatorEngine() { 110 maxNumActionsForLog = Services.get().getConf() 111 .getInt(COORD_ACTIONS_LOG_MAX_COUNT, COORD_ACTIONS_LOG_MAX_COUNT_DEFAULT); 112 } 113 114 /** 115 * Create a Coordinator engine to perform operations on behave of a user. 116 * 117 * @param user user name. 118 */ 119 public CoordinatorEngine(String user) { 120 this(); 121 this.user = ParamChecker.notEmpty(user, "user"); 122 } 123 124 /* 125 * (non-Javadoc) 126 * 127 * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String) 128 */ 129 @Override 130 public String getDefinition(String jobId) throws BaseEngineException { 131 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); 132 return job.getOrigJobXml(); 133 } 134 135 /** 136 * @param jobId 137 * @return CoordinatorJobBean 138 * @throws BaseEngineException 139 */ 140 private CoordinatorJobBean getCoordJobWithNoActionInfo(String jobId) throws BaseEngineException { 141 try { 142 return new CoordJobXCommand(jobId).call(); 143 } 144 catch (CommandException ex) { 145 throw new BaseEngineException(ex); 146 } 147 } 148 149 /** 150 * @param actionId 151 * @return CoordinatorActionBean 152 * @throws BaseEngineException 153 */ 154 public CoordinatorActionBean getCoordAction(String actionId) throws BaseEngineException { 155 try { 156 return new CoordActionInfoXCommand(actionId).call(); 157 } 158 catch (CommandException ex) { 159 throw new BaseEngineException(ex); 160 } 161 } 162 163 /* 164 * (non-Javadoc) 165 * 166 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String) 167 */ 168 @Override 169 public CoordinatorJobBean getCoordJob(String jobId) throws BaseEngineException { 170 try { 171 return new CoordJobXCommand(jobId).call(); 172 } 173 catch (CommandException ex) { 174 throw new BaseEngineException(ex); 175 } 176 } 177 178 /* 179 * (non-Javadoc) 180 * 181 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, java.lang.String, int, int) 182 */ 183 @Override 184 public CoordinatorJobBean getCoordJob(String jobId, String filter, int offset, int length, boolean desc) 185 throws BaseEngineException { 186 Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap = parseJobFilter(filter); 187 try { 188 return new CoordJobXCommand(jobId, filterMap, offset, length, desc).call(); 189 } 190 catch (CommandException ex) { 191 throw new BaseEngineException(ex); 192 } 193 } 194 195 /* 196 * (non-Javadoc) 197 * 198 * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String) 199 */ 200 @Override 201 public String getJobIdForExternalId(String externalId) throws CoordinatorEngineException { 202 return null; 203 } 204 205 /* 206 * (non-Javadoc) 207 * 208 * @see org.apache.oozie.BaseEngine#kill(java.lang.String) 209 */ 210 @Override 211 public void kill(String jobId) throws CoordinatorEngineException { 212 try { 213 new CoordKillXCommand(jobId).call(); 214 LOG.info("User " + user + " killed the Coordinator job " + jobId); 215 } 216 catch (CommandException e) { 217 throw new CoordinatorEngineException(e); 218 } 219 } 220 221 public CoordinatorActionInfo killActions(String jobId, String rangeType, String scope) throws CoordinatorEngineException { 222 try { 223 return new CoordActionsKillXCommand(jobId, rangeType, scope).call(); 224 } 225 catch (CommandException e) { 226 throw new CoordinatorEngineException(e); 227 } 228 } 229 230 /* (non-Javadoc) 231 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String) 232 */ 233 @Override 234 public void change(String jobId, String changeValue) throws CoordinatorEngineException { 235 try { 236 new CoordChangeXCommand(jobId, changeValue).call(); 237 LOG.info("User " + user + " changed the Coordinator job [" + jobId + "] to " + changeValue); 238 } 239 catch (CommandException e) { 240 throw new CoordinatorEngineException(e); 241 } 242 } 243 244 public CoordinatorActionInfo ignore(String jobId, String type, String scope) throws CoordinatorEngineException { 245 try { 246 LOG.info("User " + user + " ignore a Coordinator Action (s) [" + scope + "] of the Coordinator Job [" 247 + jobId + "]"); 248 return new CoordActionsIgnoreXCommand(jobId, type, scope).call(); 249 } 250 catch (CommandException e) { 251 throw new CoordinatorEngineException(e); 252 } 253 } 254 255 @Override 256 @Deprecated 257 public void reRun(String jobId, Configuration conf) throws BaseEngineException { 258 throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of rerun")); 259 } 260 261 /** 262 * Rerun coordinator actions for given rerunType 263 * 264 * @param jobId 265 * @param rerunType 266 * @param scope 267 * @param refresh 268 * @param noCleanup 269 * @throws BaseEngineException 270 */ 271 public CoordinatorActionInfo reRun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup, 272 boolean failed, Configuration conf) 273 throws BaseEngineException { 274 try { 275 return new CoordRerunXCommand(jobId, rerunType, scope, refresh, 276 noCleanup, failed, conf).call(); 277 } 278 catch (CommandException ex) { 279 throw new BaseEngineException(ex); 280 } 281 } 282 283 /* 284 * (non-Javadoc) 285 * 286 * @see org.apache.oozie.BaseEngine#resume(java.lang.String) 287 */ 288 @Override 289 public void resume(String jobId) throws CoordinatorEngineException { 290 try { 291 new CoordResumeXCommand(jobId).call(); 292 } 293 catch (CommandException e) { 294 throw new CoordinatorEngineException(e); 295 } 296 } 297 298 @Override 299 @Deprecated 300 public void start(String jobId) throws BaseEngineException { 301 throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of start")); 302 } 303 304 @Override 305 public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, 306 BaseEngineException { 307 streamJobLog(jobId, writer, params, LOG_TYPE.LOG); 308 } 309 310 @Override 311 public void streamErrorLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, 312 BaseEngineException { 313 streamJobLog(jobId, writer, params, LOG_TYPE.ERROR_LOG); 314 } 315 316 @Override 317 public void streamAuditLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, 318 BaseEngineException { 319 try { 320 streamJobLog(new XLogAuditFilter(new XLogUserFilterParam(params)), jobId, writer, params, LOG_TYPE.AUDIT_LOG); 321 } 322 catch (CommandException e) { 323 throw new IOException(e); 324 } 325 } 326 327 private void streamJobLog(String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType) 328 throws IOException, BaseEngineException { 329 try { 330 streamJobLog(new XLogFilter(new XLogUserFilterParam(params)), jobId, writer, params, logType); 331 } 332 catch (Exception e) { 333 throw new IOException(e); 334 } 335 } 336 337 private void streamJobLog(XLogFilter filter, String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType) 338 throws IOException, BaseEngineException { 339 try { 340 filter.setParameter(DagXLogInfoService.JOB, jobId); 341 Date lastTime = null; 342 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); 343 if (job.isTerminalStatus()) { 344 lastTime = job.getLastModifiedTime(); 345 } 346 if (lastTime == null) { 347 lastTime = new Date(); 348 } 349 fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType); 350 } 351 catch (Exception e) { 352 throw new IOException(e); 353 } 354 } 355 356 /** 357 * Add list of actions to the filter based on conditions 358 * 359 * @param jobId Job Id 360 * @param logRetrievalScope Value for the retrieval type 361 * @param logRetrievalType Based on which filter criteria the log is retrieved 362 * @param writer writer to stream the log to 363 * @param params additional parameters from the request 364 * @throws IOException 365 * @throws BaseEngineException 366 * @throws CommandException 367 */ 368 public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer, 369 Map<String, String[]> params) throws IOException, BaseEngineException, CommandException { 370 371 Date startTime = null; 372 Date endTime = null; 373 XLogFilter filter = new XLogFilter(new XLogUserFilterParam(params)); 374 375 filter.setParameter(DagXLogInfoService.JOB, jobId); 376 if (logRetrievalScope != null && logRetrievalType != null) { 377 // if coordinator action logs are to be retrieved based on action id range 378 if (logRetrievalType.equals(RestConstants.JOB_LOG_ACTION)) { 379 // Use set implementation that maintains order or elements to achieve reproducibility: 380 Set<String> actionSet = new LinkedHashSet<String>(); 381 String[] list = logRetrievalScope.split(","); 382 for (String s : list) { 383 s = s.trim(); 384 if (s.contains("-")) { 385 String[] range = s.split("-"); 386 if (range.length != 2) { 387 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s 388 + "'"); 389 } 390 int start; 391 int end; 392 try { 393 start = Integer.parseInt(range[0].trim()); 394 } catch (NumberFormatException ne) { 395 throw new CommandException(ErrorCode.E0302, "could not parse " + range[0].trim() + "into an integer", 396 ne); 397 } 398 try { 399 end = Integer.parseInt(range[1].trim()); 400 } catch (NumberFormatException ne) { 401 throw new CommandException(ErrorCode.E0302, "could not parse " + range[1].trim() + "into an integer", 402 ne); 403 } 404 if (start > end) { 405 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'"); 406 } 407 for (int i = start; i <= end; i++) { 408 actionSet.add(jobId + "@" + i); 409 } 410 } 411 else { 412 try { 413 Integer.parseInt(s); 414 } 415 catch (NumberFormatException ne) { 416 throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s 417 + "'. Integer only."); 418 } 419 actionSet.add(jobId + "@" + s); 420 } 421 } 422 423 if (actionSet.size() >= maxNumActionsForLog) { 424 throw new CommandException(ErrorCode.E0302, 425 "Retrieving log of too many coordinator actions. Max count is " 426 + maxNumActionsForLog + " actions"); 427 } 428 Iterator<String> actionsIterator = actionSet.iterator(); 429 StringBuilder orSeparatedActions = new StringBuilder(""); 430 boolean orRequired = false; 431 while (actionsIterator.hasNext()) { 432 if (orRequired) { 433 orSeparatedActions.append("|"); 434 } 435 orSeparatedActions.append(actionsIterator.next().toString()); 436 orRequired = true; 437 } 438 if (actionSet.size() > 1 && orRequired) { 439 orSeparatedActions.insert(0, "("); 440 orSeparatedActions.append(")"); 441 } 442 443 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString()); 444 if (actionSet != null && actionSet.size() == 1) { 445 CoordinatorActionBean actionBean = getCoordAction(actionSet.iterator().next()); 446 startTime = actionBean.getCreatedTime(); 447 endTime = actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : actionBean 448 .getLastModifiedTime(); 449 filter.setActionList(true); 450 } 451 else if (actionSet != null && actionSet.size() > 0) { 452 List<String> tempList = new ArrayList<String>(actionSet); 453 Collections.sort(tempList, new Comparator<String>() { 454 public int compare(String a, String b) { 455 return Integer.valueOf(a.substring(a.lastIndexOf("@") + 1)).compareTo( 456 Integer.valueOf(b.substring(b.lastIndexOf("@") + 1))); 457 } 458 }); 459 startTime = getCoordAction(tempList.get(0)).getCreatedTime(); 460 endTime = CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, tempList.get(0), 461 tempList.get(tempList.size() - 1)); 462 filter.setActionList(true); 463 } 464 } 465 // if coordinator action logs are to be retrieved based on date range 466 // this block gets the corresponding list of coordinator actions to be used by the log filter 467 if (logRetrievalType.equalsIgnoreCase(RestConstants.JOB_LOG_DATE)) { 468 List<String> coordActionIdList = null; 469 try { 470 coordActionIdList = CoordActionsInDateRange.getCoordActionIdsFromDates(jobId, logRetrievalScope); 471 } 472 catch (XException xe) { 473 throw new CommandException(ErrorCode.E0302, "Error in date range for coordinator actions", xe); 474 } 475 if(coordActionIdList.size() >= maxNumActionsForLog) { 476 throw new CommandException(ErrorCode.E0302, 477 "Retrieving log of too many coordinator actions. Max count is " 478 + maxNumActionsForLog + " actions"); 479 } 480 StringBuilder orSeparatedActions = new StringBuilder(""); 481 boolean orRequired = false; 482 for (String coordActionId : coordActionIdList) { 483 if (orRequired) { 484 orSeparatedActions.append("|"); 485 } 486 orSeparatedActions.append(coordActionId); 487 orRequired = true; 488 } 489 if (coordActionIdList.size() > 1 && orRequired) { 490 orSeparatedActions.insert(0, "("); 491 orSeparatedActions.append(")"); 492 } 493 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString()); 494 if (coordActionIdList != null && coordActionIdList.size() == 1) { 495 CoordinatorActionBean actionBean = getCoordAction(coordActionIdList.get(0)); 496 startTime = actionBean.getCreatedTime(); 497 endTime = actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : actionBean 498 .getLastModifiedTime(); 499 filter.setActionList(true); 500 } 501 else if (coordActionIdList != null && coordActionIdList.size() > 0) { 502 Collections.sort(coordActionIdList, new Comparator<String>() { 503 public int compare(String a, String b) { 504 return Integer.valueOf(a.substring(a.lastIndexOf("@") + 1)).compareTo( 505 Integer.valueOf(b.substring(b.lastIndexOf("@") + 1))); 506 } 507 }); 508 startTime = getCoordAction(coordActionIdList.get(0)).getCreatedTime(); 509 endTime = CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, coordActionIdList.get(0), 510 coordActionIdList.get(coordActionIdList.size() - 1)); 511 filter.setActionList(true); 512 } 513 } 514 } 515 if (startTime == null || endTime == null) { 516 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); 517 if (startTime == null) { 518 startTime = job.getCreatedTime(); 519 } 520 if (endTime == null) { 521 if (job.isTerminalStatus()) { 522 endTime = job.getLastModifiedTime(); 523 } 524 if (endTime == null) { 525 endTime = new Date(); 526 } 527 } 528 } 529 Services.get().get(XLogStreamingService.class).streamLog(filter, startTime, endTime, writer, params); 530 } 531 532 /* 533 * (non-Javadoc) 534 * 535 * @see 536 * org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration 537 * , boolean) 538 */ 539 @Override 540 public String submitJob(Configuration conf, boolean startJob) throws CoordinatorEngineException { 541 try { 542 CoordSubmitXCommand submit = new CoordSubmitXCommand(conf); 543 return submit.call(); 544 } 545 catch (CommandException ex) { 546 throw new CoordinatorEngineException(ex); 547 } 548 } 549 550 /* 551 * (non-Javadoc) 552 * 553 * @see 554 * org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration) 555 */ 556 @Override 557 public String dryRunSubmit(Configuration conf) throws CoordinatorEngineException { 558 try { 559 CoordSubmitXCommand submit = new CoordSubmitXCommand(true, conf); 560 return submit.call(); 561 } 562 catch (CommandException ex) { 563 throw new CoordinatorEngineException(ex); 564 } 565 } 566 567 /* 568 * (non-Javadoc) 569 * 570 * @see org.apache.oozie.BaseEngine#suspend(java.lang.String) 571 */ 572 @Override 573 public void suspend(String jobId) throws CoordinatorEngineException { 574 try { 575 new CoordSuspendXCommand(jobId).call(); 576 } 577 catch (CommandException e) { 578 throw new CoordinatorEngineException(e); 579 } 580 581 } 582 583 /* 584 * (non-Javadoc) 585 * 586 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String) 587 */ 588 @Override 589 public WorkflowJob getJob(String jobId) throws BaseEngineException { 590 throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine")); 591 } 592 593 /* 594 * (non-Javadoc) 595 * 596 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int) 597 */ 598 @Override 599 public WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException { 600 throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine")); 601 } 602 603 private static final Set<String> FILTER_NAMES = new HashSet<String>(); 604 605 static { 606 FILTER_NAMES.add(OozieClient.FILTER_USER); 607 FILTER_NAMES.add(OozieClient.FILTER_NAME); 608 FILTER_NAMES.add(OozieClient.FILTER_GROUP); 609 FILTER_NAMES.add(OozieClient.FILTER_STATUS); 610 FILTER_NAMES.add(OozieClient.FILTER_ID); 611 FILTER_NAMES.add(OozieClient.FILTER_FREQUENCY); 612 FILTER_NAMES.add(OozieClient.FILTER_UNIT); 613 FILTER_NAMES.add(OozieClient.FILTER_SORT_BY); 614 FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_START); 615 FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_END); 616 } 617 618 /** 619 * @param filter 620 * @param start 621 * @param len 622 * @return CoordinatorJobInfo 623 * @throws CoordinatorEngineException 624 */ 625 public CoordinatorJobInfo getCoordJobs(String filter, int start, int len) throws CoordinatorEngineException { 626 Map<String, List<String>> filterList = parseJobsFilter(filter); 627 628 try { 629 return new CoordJobsXCommand(filterList, start, len).call(); 630 } 631 catch (CommandException ex) { 632 throw new CoordinatorEngineException(ex); 633 } 634 } 635 636 // Parses the filter string (e.g status=RUNNING;status=WAITING) and returns a list of status values 637 public Map<Pair<String, FILTER_COMPARATORS>, List<Object>> parseJobFilter(String filter) throws 638 CoordinatorEngineException { 639 Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap = new HashMap<Pair<String, 640 FILTER_COMPARATORS>, List<Object>>(); 641 if (filter != null) { 642 //split name value pairs 643 StringTokenizer st = new StringTokenizer(filter, ";"); 644 while (st.hasMoreTokens()) { 645 String token = st.nextToken().trim(); 646 Pair<String, FILTER_COMPARATORS> pair = null; 647 for (FILTER_COMPARATORS comp : FILTER_COMPARATORS.values()) { 648 if (token.contains(comp.getSign())) { 649 int index = token.indexOf(comp.getSign()); 650 String key = token.substring(0, index); 651 String valueStr = token.substring(index + comp.getSign().length()); 652 Object value; 653 654 if (key.equalsIgnoreCase(OozieClient.FILTER_STATUS)) { 655 value = valueStr.toUpperCase(); 656 try { 657 CoordinatorAction.Status.valueOf((String) value); 658 } catch (IllegalArgumentException ex) { 659 // Check for incorrect status value 660 throw new CoordinatorEngineException(ErrorCode.E0421, filter, 661 XLog.format("invalid status value [{0}]." + " Valid status values are: [{1}]", 662 valueStr, StringUtils.join(CoordinatorAction.Status.values(), ", "))); 663 } 664 665 if (!(comp == FILTER_COMPARATORS.EQUALS || comp == FILTER_COMPARATORS.NOT_EQUALS)) { 666 throw new CoordinatorEngineException(ErrorCode.E0421, filter, 667 XLog.format("invalid comparator [{0}] for status." + " Valid are = and !=", 668 comp.getSign())); 669 } 670 671 pair = Pair.of(OozieClient.FILTER_STATUS, comp); 672 } else if (key.equalsIgnoreCase(OozieClient.FILTER_NOMINAL_TIME)) { 673 try { 674 value = new Timestamp(DateUtils.parseDateUTC(valueStr).getTime()); 675 } catch (ParseException e) { 676 throw new CoordinatorEngineException(ErrorCode.E0421, filter, 677 XLog.format("invalid nominal time [{0}]." + " Valid format: " + 678 "[{1}]", valueStr, DateUtils.ISO8601_UTC_MASK)); 679 } 680 pair = Pair.of(OozieClient.FILTER_NOMINAL_TIME, comp); 681 } else { 682 // Check for incorrect filter option 683 throw new CoordinatorEngineException(ErrorCode.E0421, filter, 684 XLog.format("invalid filter [{0}]." + " Valid filters [{1}]", key, StringUtils.join 685 (VALID_JOB_FILTERS, ", "))); 686 } 687 if (!filterMap.containsKey(pair)) { 688 filterMap.put(pair, new ArrayList<Object>()); 689 } 690 filterMap.get(pair).add(value); 691 break; 692 } 693 } 694 695 if (pair == null) { 696 //token doesn't contain comparator 697 throw new CoordinatorEngineException(ErrorCode.E0421, filter, 698 "filter should be of format <key><comparator><value> pairs"); 699 } 700 } 701 } 702 return filterMap; 703 } 704 705 /** 706 * @param filter 707 * @return Map<String, List<String>> 708 * @throws CoordinatorEngineException 709 */ 710 @VisibleForTesting 711 Map<String, List<String>> parseJobsFilter(String filter) throws CoordinatorEngineException { 712 Map<String, List<String>> map = new HashMap<String, List<String>>(); 713 boolean isTimeUnitSpecified = false; 714 String timeUnit = "MINUTE"; 715 boolean isFrequencySpecified = false; 716 String frequency = ""; 717 if (filter != null) { 718 StringTokenizer st = new StringTokenizer(filter, ";"); 719 while (st.hasMoreTokens()) { 720 String token = st.nextToken(); 721 if (token.contains("=")) { 722 String[] pair = token.split("="); 723 if (pair.length != 2) { 724 throw new CoordinatorEngineException(ErrorCode.E0420, filter, 725 "elements must be semicolon-separated name=value pairs"); 726 } 727 pair[0] = pair[0].toLowerCase(); 728 if (!FILTER_NAMES.contains(pair[0])) { 729 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]", 730 pair[0])); 731 } 732 if (pair[0].equalsIgnoreCase("frequency")) { 733 isFrequencySpecified = true; 734 try { 735 frequency = (int) Float.parseFloat(pair[1]) + ""; 736 continue; 737 } 738 catch (NumberFormatException NANException) { 739 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( 740 "invalid value [{0}] for frequency. A numerical value is expected", pair[1])); 741 } 742 } 743 if (pair[0].equalsIgnoreCase("unit")) { 744 isTimeUnitSpecified = true; 745 timeUnit = pair[1]; 746 if (!timeUnit.equalsIgnoreCase("months") && !timeUnit.equalsIgnoreCase("days") 747 && !timeUnit.equalsIgnoreCase("hours") && !timeUnit.equalsIgnoreCase("minutes")) { 748 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( 749 "invalid value [{0}] for time unit. " 750 + "Valid value is one of months, days, hours or minutes", pair[1])); 751 } 752 continue; 753 } 754 if (pair[0].equals("status")) { 755 try { 756 CoordinatorJob.Status.valueOf(pair[1]); 757 } 758 catch (IllegalArgumentException ex) { 759 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( 760 "invalid status [{0}]", pair[1])); 761 } 762 } 763 List<String> list = map.get(pair[0]); 764 if (list == null) { 765 list = new ArrayList<String>(); 766 map.put(pair[0], list); 767 } 768 list.add(pair[1]); 769 } else { 770 throw new CoordinatorEngineException(ErrorCode.E0420, filter, 771 "elements must be semicolon-separated name=value pairs"); 772 } 773 } 774 // Unit is specified and frequency is not specified 775 if (!isFrequencySpecified && isTimeUnitSpecified) { 776 throw new CoordinatorEngineException(ErrorCode.E0420, filter, "time unit should be added only when " 777 + "frequency is specified. Either specify frequency also or else remove the time unit"); 778 } else if (isFrequencySpecified) { 779 // Frequency value is specified 780 if (isTimeUnitSpecified) { 781 if (timeUnit.equalsIgnoreCase("months")) { 782 timeUnit = "MONTH"; 783 } else if (timeUnit.equalsIgnoreCase("days")) { 784 timeUnit = "DAY"; 785 } else if (timeUnit.equalsIgnoreCase("hours")) { 786 // When job details are persisted to database, frequency in hours are converted to minutes. 787 // This conversion is to conform with that. 788 frequency = Integer.parseInt(frequency) * 60 + ""; 789 timeUnit = "MINUTE"; 790 } else if (timeUnit.equalsIgnoreCase("minutes")) { 791 timeUnit = "MINUTE"; 792 } 793 } 794 // Adding the frequency and time unit filters to the filter map 795 List<String> list = new ArrayList<String>(); 796 list.add(timeUnit); 797 map.put("unit", list); 798 list = new ArrayList<String>(); 799 list.add(frequency); 800 map.put("frequency", list); 801 } 802 } 803 return map; 804 } 805 806 public List<WorkflowJobBean> getReruns(String coordActionId) throws CoordinatorEngineException { 807 List<WorkflowJobBean> wfBeans; 808 try { 809 wfBeans = WorkflowJobQueryExecutor.getInstance().getList(WorkflowJobQuery.GET_WORKFLOWS_PARENT_COORD_RERUN, 810 coordActionId); 811 } 812 catch (JPAExecutorException e) { 813 throw new CoordinatorEngineException(e); 814 } 815 return wfBeans; 816 } 817 818 /** 819 * Update coord job definition. 820 * 821 * @param conf the conf 822 * @param jobId the job id 823 * @param dryrun the dryrun 824 * @param showDiff the show diff 825 * @return the string 826 * @throws CoordinatorEngineException the coordinator engine exception 827 */ 828 public String updateJob(Configuration conf, String jobId, boolean dryrun, boolean showDiff) 829 throws CoordinatorEngineException { 830 try { 831 CoordUpdateXCommand update = new CoordUpdateXCommand(dryrun, conf, jobId, showDiff); 832 return update.call(); 833 } 834 catch (CommandException ex) { 835 throw new CoordinatorEngineException(ex); 836 } 837 } 838 839 /** 840 * Return the status for a Job ID 841 * 842 * @param jobId job Id. 843 * @return the job's status 844 * @throws CoordinatorEngineException thrown if the job's status could not be obtained 845 */ 846 @Override 847 public String getJobStatus(String jobId) throws CoordinatorEngineException { 848 try { 849 CoordinatorJobBean coordJob = CoordJobQueryExecutor.getInstance().get( 850 CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB_STATUS, jobId); 851 return coordJob.getStatusStr(); 852 } 853 catch (JPAExecutorException e) { 854 throw new CoordinatorEngineException(e); 855 } 856 } 857 858 /** 859 * Return the status for an Action ID 860 * 861 * @param actionId action Id. 862 * @return the action's status 863 * @throws CoordinatorEngineException thrown if the action's status could not be obtained 864 */ 865 public String getActionStatus(String actionId) throws CoordinatorEngineException { 866 try { 867 CoordinatorActionBean coordAction = CoordActionQueryExecutor.getInstance().get( 868 CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION_STATUS, actionId); 869 return coordAction.getStatusStr(); 870 } 871 catch (JPAExecutorException e) { 872 throw new CoordinatorEngineException(e); 873 } 874 } 875 876 @Override 877 public void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException { 878 try { 879 new CoordSLAAlertsDisableXCommand(id, actions, dates).call(); 880 881 } 882 catch (CommandException e) { 883 throw new CoordinatorEngineException(e); 884 } 885 } 886 887 @Override 888 public void changeSLA(String id, String actions, String dates, String childIds, String newParams) 889 throws BaseEngineException { 890 Map<String, String> slaNewParams = null; 891 892 try { 893 894 if (newParams != null) { 895 slaNewParams = JobUtils.parseChangeValue(newParams); 896 } 897 898 new CoordSLAChangeXCommand(id, actions, dates, slaNewParams).call(); 899 900 } 901 catch (CommandException e) { 902 throw new CoordinatorEngineException(e); 903 } 904 } 905 906 @Override 907 public void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException { 908 try { 909 new CoordSLAAlertsEnableXCommand(id, actions, dates).call(); 910 911 } 912 catch (CommandException e) { 913 throw new CoordinatorEngineException(e); 914 } 915 } 916 917 /** 918 * return a list of killed Coordinator job 919 * 920 * @param filter, the filter string for which the coordinator jobs are killed 921 * @param start, the starting index for coordinator jobs 922 * @param length, maximum number of jobs to be killed 923 * @return the list of jobs being killed 924 * @throws CoordinatorEngineException thrown if one or more of the jobs cannot be killed 925 */ 926 public CoordinatorJobInfo killJobs(String filter, int start, int length) throws CoordinatorEngineException { 927 try { 928 Map<String, List<String>> filterMap = parseJobsFilter(filter); 929 CoordinatorJobInfo coordinatorJobInfo = 930 new BulkCoordXCommand(filterMap, start, length, OperationType.Kill).call(); 931 if (coordinatorJobInfo == null) { 932 return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0); 933 } 934 return coordinatorJobInfo; 935 } 936 catch (CommandException ex) { 937 throw new CoordinatorEngineException(ex); 938 } 939 } 940 941 /** 942 * return the jobs that've been suspended 943 * @param filter Filter for jobs that will be suspended, can be name, user, group, status, id or combination of any 944 * @param start Offset for the jobs that will be suspended 945 * @param length maximum number of jobs that will be suspended 946 * @return 947 * @throws CoordinatorEngineException 948 */ 949 public CoordinatorJobInfo suspendJobs(String filter, int start, int length) throws CoordinatorEngineException { 950 try { 951 Map<String, List<String>> filterMap = parseJobsFilter(filter); 952 CoordinatorJobInfo coordinatorJobInfo = 953 new BulkCoordXCommand(filterMap, start, length, OperationType.Suspend).call(); 954 if (coordinatorJobInfo == null) { 955 return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0); 956 } 957 return coordinatorJobInfo; 958 } 959 catch (CommandException ex) { 960 throw new CoordinatorEngineException(ex); 961 } 962 } 963 964 /** 965 * return the jobs that've been resumed 966 * @param filter Filter for jobs that will be resumed, can be name, user, group, status, id or combination of any 967 * @param start Offset for the jobs that will be resumed 968 * @param length maximum number of jobs that will be resumed 969 * @return 970 * @throws CoordinatorEngineException 971 */ 972 public CoordinatorJobInfo resumeJobs(String filter, int start, int length) throws CoordinatorEngineException { 973 try { 974 Map<String, List<String>> filterMap = parseJobsFilter(filter); 975 CoordinatorJobInfo coordinatorJobInfo = 976 new BulkCoordXCommand(filterMap, start, length, OperationType.Resume).call(); 977 if (coordinatorJobInfo == null) { 978 return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0); 979 } 980 return coordinatorJobInfo; 981 } 982 catch (CommandException ex) { 983 throw new CoordinatorEngineException(ex); 984 } 985 } 986}