001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie; 019 020 import java.io.IOException; 021 import java.io.Writer; 022 import java.util.ArrayList; 023 import java.util.Date; 024 import java.util.HashMap; 025 import java.util.HashSet; 026 import java.util.Iterator; 027 import java.util.LinkedHashSet; 028 import java.util.List; 029 import java.util.Map; 030 import java.util.Set; 031 import java.util.StringTokenizer; 032 033 import org.apache.hadoop.conf.Configuration; 034 import org.apache.oozie.client.CoordinatorAction; 035 import org.apache.oozie.client.CoordinatorJob; 036 import org.apache.oozie.client.OozieClient; 037 import org.apache.oozie.client.WorkflowJob; 038 import org.apache.oozie.client.rest.RestConstants; 039 import org.apache.oozie.command.CommandException; 040 import org.apache.oozie.command.coord.CoordActionInfoXCommand; 041 import org.apache.oozie.util.CoordActionsInDateRange; 042 import org.apache.oozie.command.coord.CoordChangeXCommand; 043 import org.apache.oozie.command.coord.CoordJobXCommand; 044 import org.apache.oozie.command.coord.CoordJobsXCommand; 045 import org.apache.oozie.command.coord.CoordKillXCommand; 046 import org.apache.oozie.command.coord.CoordRerunXCommand; 047 import org.apache.oozie.command.coord.CoordResumeXCommand; 048 import org.apache.oozie.command.coord.CoordSubmitXCommand; 049 import org.apache.oozie.command.coord.CoordSuspendXCommand; 050 import org.apache.oozie.service.DagXLogInfoService; 051 import org.apache.oozie.service.Services; 052 import org.apache.oozie.service.XLogService; 053 import org.apache.oozie.util.ParamChecker; 054 import org.apache.oozie.util.XLog; 055 import org.apache.oozie.util.XLogStreamer; 056 057 import com.google.common.annotations.VisibleForTesting; 058 059 public class CoordinatorEngine extends BaseEngine { 060 private static XLog LOG = XLog.getLog(CoordinatorEngine.class); 061 062 /** 063 * Create a system Coordinator engine, with no user and no group. 064 */ 065 public CoordinatorEngine() { 066 if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) { 067 LOG.debug("Oozie CoordinatorEngine is not using XCommands."); 068 } 069 else { 070 LOG.debug("Oozie CoordinatorEngine is using XCommands."); 071 } 072 } 073 074 /** 075 * Create a Coordinator engine to perform operations on behave of a user. 076 * 077 * @param user user name. 078 */ 079 public CoordinatorEngine(String user) { 080 this(); 081 this.user = ParamChecker.notEmpty(user, "user"); 082 } 083 084 /* 085 * (non-Javadoc) 086 * 087 * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String) 088 */ 089 @Override 090 public String getDefinition(String jobId) throws BaseEngineException { 091 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); 092 return job.getOrigJobXml(); 093 } 094 095 /** 096 * @param jobId 097 * @return CoordinatorJobBean 098 * @throws BaseEngineException 099 */ 100 private CoordinatorJobBean getCoordJobWithNoActionInfo(String jobId) throws BaseEngineException { 101 try { 102 return new CoordJobXCommand(jobId).call(); 103 } 104 catch (CommandException ex) { 105 throw new BaseEngineException(ex); 106 } 107 } 108 109 /** 110 * @param actionId 111 * @return CoordinatorActionBean 112 * @throws BaseEngineException 113 */ 114 public CoordinatorActionBean getCoordAction(String actionId) throws BaseEngineException { 115 try { 116 return new CoordActionInfoXCommand(actionId).call(); 117 } 118 catch (CommandException ex) { 119 throw new BaseEngineException(ex); 120 } 121 } 122 123 /* 124 * (non-Javadoc) 125 * 126 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String) 127 */ 128 @Override 129 public CoordinatorJobBean getCoordJob(String jobId) throws BaseEngineException { 130 try { 131 return new CoordJobXCommand(jobId).call(); 132 } 133 catch (CommandException ex) { 134 throw new BaseEngineException(ex); 135 } 136 } 137 138 /* 139 * (non-Javadoc) 140 * 141 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, java.lang.String, int, int) 142 */ 143 @Override 144 public CoordinatorJobBean getCoordJob(String jobId, String filter, int start, int length, boolean desc) 145 throws BaseEngineException { 146 List<String> filterList = parseStatusFilter(filter); 147 try { 148 return new CoordJobXCommand(jobId, filterList, start, length, desc) 149 .call(); 150 } 151 catch (CommandException ex) { 152 throw new BaseEngineException(ex); 153 } 154 } 155 156 /* 157 * (non-Javadoc) 158 * 159 * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String) 160 */ 161 @Override 162 public String getJobIdForExternalId(String externalId) throws CoordinatorEngineException { 163 return null; 164 } 165 166 /* 167 * (non-Javadoc) 168 * 169 * @see org.apache.oozie.BaseEngine#kill(java.lang.String) 170 */ 171 @Override 172 public void kill(String jobId) throws CoordinatorEngineException { 173 try { 174 new CoordKillXCommand(jobId).call(); 175 LOG.info("User " + user + " killed the Coordinator job " + jobId); 176 } 177 catch (CommandException e) { 178 throw new CoordinatorEngineException(e); 179 } 180 } 181 182 /* (non-Javadoc) 183 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String) 184 */ 185 @Override 186 public void change(String jobId, String changeValue) throws CoordinatorEngineException { 187 try { 188 new CoordChangeXCommand(jobId, changeValue).call(); 189 LOG.info("User " + user + " changed the Coordinator job " + jobId + " to " + changeValue); 190 } 191 catch (CommandException e) { 192 throw new CoordinatorEngineException(e); 193 } 194 } 195 196 @Override 197 @Deprecated 198 public void reRun(String jobId, Configuration conf) throws BaseEngineException { 199 throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of rerun")); 200 } 201 202 /** 203 * Rerun coordinator actions for given rerunType 204 * 205 * @param jobId 206 * @param rerunType 207 * @param scope 208 * @param refresh 209 * @param noCleanup 210 * @throws BaseEngineException 211 */ 212 public CoordinatorActionInfo reRun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) 213 throws BaseEngineException { 214 try { 215 return new CoordRerunXCommand(jobId, rerunType, scope, refresh, 216 noCleanup).call(); 217 } 218 catch (CommandException ex) { 219 throw new BaseEngineException(ex); 220 } 221 } 222 223 /* 224 * (non-Javadoc) 225 * 226 * @see org.apache.oozie.BaseEngine#resume(java.lang.String) 227 */ 228 @Override 229 public void resume(String jobId) throws CoordinatorEngineException { 230 try { 231 new CoordResumeXCommand(jobId).call(); 232 } 233 catch (CommandException e) { 234 throw new CoordinatorEngineException(e); 235 } 236 } 237 238 @Override 239 @Deprecated 240 public void start(String jobId) throws BaseEngineException { 241 throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of start")); 242 } 243 244 /* 245 * (non-Javadoc) 246 * 247 * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String, 248 * java.io.Writer) 249 */ 250 @Override 251 public void streamLog(String jobId, Writer writer) throws IOException, BaseEngineException { 252 XLogStreamer.Filter filter = new XLogStreamer.Filter(); 253 filter.setParameter(DagXLogInfoService.JOB, jobId); 254 255 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); 256 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer); 257 } 258 259 /** 260 * Add list of actions to the filter based on conditions 261 * 262 * @param jobId Job Id 263 * @param logRetrievalScope Value for the retrieval type 264 * @param logRetrievalType Based on which filter criteria the log is retrieved 265 * @param writer writer to stream the log to 266 * @throws IOException 267 * @throws BaseEngineException 268 * @throws CommandException 269 */ 270 public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer) 271 throws IOException, BaseEngineException, CommandException { 272 XLogStreamer.Filter filter = new XLogStreamer.Filter(); 273 filter.setParameter(DagXLogInfoService.JOB, jobId); 274 if (logRetrievalScope != null && logRetrievalType != null) { 275 // if coordinator action logs are to be retrieved based on action id range 276 if (logRetrievalType.equals(RestConstants.JOB_LOG_ACTION)) { 277 // Use set implementation that maintains order or elements to achieve reproducibility: 278 Set<String> actionSet = new LinkedHashSet<String>(); 279 String[] list = logRetrievalScope.split(","); 280 for (String s : list) { 281 s = s.trim(); 282 if (s.contains("-")) { 283 String[] range = s.split("-"); 284 if (range.length != 2) { 285 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s 286 + "'"); 287 } 288 int start; 289 int end; 290 try { 291 start = Integer.parseInt(range[0].trim()); 292 } catch (NumberFormatException ne) { 293 throw new CommandException(ErrorCode.E0302, "could not parse " + range[0].trim() + "into an integer", 294 ne); 295 } 296 try { 297 end = Integer.parseInt(range[1].trim()); 298 } catch (NumberFormatException ne) { 299 throw new CommandException(ErrorCode.E0302, "could not parse " + range[1].trim() + "into an integer", 300 ne); 301 } 302 if (start > end) { 303 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'"); 304 } 305 for (int i = start; i <= end; i++) { 306 actionSet.add(jobId + "@" + i); 307 } 308 } 309 else { 310 try { 311 Integer.parseInt(s); 312 } 313 catch (NumberFormatException ne) { 314 throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s 315 + "'. Integer only."); 316 } 317 actionSet.add(jobId + "@" + s); 318 } 319 } 320 321 Iterator<String> actionsIterator = actionSet.iterator(); 322 StringBuilder orSeparatedActions = new StringBuilder(""); 323 boolean orRequired = false; 324 while (actionsIterator.hasNext()) { 325 if (orRequired) { 326 orSeparatedActions.append("|"); 327 } 328 orSeparatedActions.append(actionsIterator.next().toString()); 329 orRequired = true; 330 } 331 if (actionSet.size() > 1 && orRequired) { 332 orSeparatedActions.insert(0, "("); 333 orSeparatedActions.append(")"); 334 } 335 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString()); 336 } 337 // if coordinator action logs are to be retrieved based on date range 338 // this block gets the corresponding list of coordinator actions to be used by the log filter 339 if (logRetrievalType.equalsIgnoreCase(RestConstants.JOB_LOG_DATE)) { 340 List<String> coordActionIdList = null; 341 try { 342 coordActionIdList = CoordActionsInDateRange.getCoordActionIdsFromDates(jobId, logRetrievalScope); 343 } 344 catch (XException xe) { 345 throw new CommandException(ErrorCode.E0302, "Error in date range for coordinator actions", xe); 346 } 347 StringBuilder orSeparatedActions = new StringBuilder(""); 348 boolean orRequired = false; 349 for (String coordActionId : coordActionIdList) { 350 if (orRequired) { 351 orSeparatedActions.append("|"); 352 } 353 orSeparatedActions.append(coordActionId); 354 orRequired = true; 355 } 356 if (coordActionIdList.size() > 1 && orRequired) { 357 orSeparatedActions.insert(0, "("); 358 orSeparatedActions.append(")"); 359 } 360 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString()); 361 } 362 } 363 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); 364 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer); 365 } 366 367 /* 368 * (non-Javadoc) 369 * 370 * @see 371 * org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration 372 * , boolean) 373 */ 374 @Override 375 public String submitJob(Configuration conf, boolean startJob) throws CoordinatorEngineException { 376 try { 377 CoordSubmitXCommand submit = new CoordSubmitXCommand(conf); 378 return submit.call(); 379 } 380 catch (CommandException ex) { 381 throw new CoordinatorEngineException(ex); 382 } 383 } 384 385 /* 386 * (non-Javadoc) 387 * 388 * @see 389 * org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration) 390 */ 391 @Override 392 public String dryRunSubmit(Configuration conf) throws CoordinatorEngineException { 393 try { 394 CoordSubmitXCommand submit = new CoordSubmitXCommand(true, conf); 395 return submit.call(); 396 } 397 catch (CommandException ex) { 398 throw new CoordinatorEngineException(ex); 399 } 400 } 401 402 /* 403 * (non-Javadoc) 404 * 405 * @see org.apache.oozie.BaseEngine#suspend(java.lang.String) 406 */ 407 @Override 408 public void suspend(String jobId) throws CoordinatorEngineException { 409 try { 410 new CoordSuspendXCommand(jobId).call(); 411 } 412 catch (CommandException e) { 413 throw new CoordinatorEngineException(e); 414 } 415 416 } 417 418 /* 419 * (non-Javadoc) 420 * 421 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String) 422 */ 423 @Override 424 public WorkflowJob getJob(String jobId) throws BaseEngineException { 425 throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine")); 426 } 427 428 /* 429 * (non-Javadoc) 430 * 431 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int) 432 */ 433 @Override 434 public WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException { 435 throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine")); 436 } 437 438 private static final Set<String> FILTER_NAMES = new HashSet<String>(); 439 440 static { 441 FILTER_NAMES.add(OozieClient.FILTER_USER); 442 FILTER_NAMES.add(OozieClient.FILTER_NAME); 443 FILTER_NAMES.add(OozieClient.FILTER_GROUP); 444 FILTER_NAMES.add(OozieClient.FILTER_STATUS); 445 FILTER_NAMES.add(OozieClient.FILTER_ID); 446 FILTER_NAMES.add(OozieClient.FILTER_FREQUENCY); 447 FILTER_NAMES.add(OozieClient.FILTER_UNIT); 448 } 449 450 /** 451 * @param filter 452 * @param start 453 * @param len 454 * @return CoordinatorJobInfo 455 * @throws CoordinatorEngineException 456 */ 457 public CoordinatorJobInfo getCoordJobs(String filter, int start, int len) throws CoordinatorEngineException { 458 Map<String, List<String>> filterList = parseFilter(filter); 459 460 try { 461 return new CoordJobsXCommand(filterList, start, len).call(); 462 } 463 catch (CommandException ex) { 464 throw new CoordinatorEngineException(ex); 465 } 466 } 467 468 469 // Parses the filter string (e.g status=RUNNING;status=WAITING) and returns a list of status values 470 private List<String> parseStatusFilter(String filter) throws CoordinatorEngineException { 471 List<String> filterList = new ArrayList<String>(); 472 if (filter != null) { 473 //split name;value pairs 474 StringTokenizer st = new StringTokenizer(filter, ";"); 475 while (st.hasMoreTokens()) { 476 String token = st.nextToken(); 477 if (token.contains("=")) { 478 String[] pair = token.split("="); 479 if (pair.length != 2) { 480 throw new CoordinatorEngineException(ErrorCode.E0421, token, 481 "elements must be name=value pairs"); 482 } 483 if (pair[0].equalsIgnoreCase("status")) { 484 String statusValue = pair[1]; 485 try { 486 CoordinatorAction.Status.valueOf(statusValue); 487 } catch (IllegalArgumentException ex) { 488 StringBuilder validStatusList = new StringBuilder(); 489 for (CoordinatorAction.Status status: CoordinatorAction.Status.values()){ 490 validStatusList.append(status.toString()+" "); 491 } 492 // Check for incorrect status value 493 throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format( 494 "invalid status value [{0}]." + " Valid status values are: [{1}]", statusValue, validStatusList)); 495 } 496 filterList.add(statusValue); 497 } else { 498 // Check for incorrect filter option 499 throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format( 500 "invalid filter [{0}]." + " The only valid filter is \"status\"", pair[0])); 501 } 502 } else { 503 throw new CoordinatorEngineException(ErrorCode.E0421, token, 504 "elements must be name=value pairs"); 505 } 506 } 507 } 508 return filterList; 509 } 510 511 /** 512 * @param filter 513 * @return Map<String, List<String>> 514 * @throws CoordinatorEngineException 515 */ 516 @VisibleForTesting 517 Map<String, List<String>> parseFilter(String filter) throws CoordinatorEngineException { 518 Map<String, List<String>> map = new HashMap<String, List<String>>(); 519 boolean isTimeUnitSpecified = false; 520 String timeUnit = "MINUTE"; 521 boolean isFrequencySpecified = false; 522 String frequency = ""; 523 if (filter != null) { 524 StringTokenizer st = new StringTokenizer(filter, ";"); 525 while (st.hasMoreTokens()) { 526 String token = st.nextToken(); 527 if (token.contains("=")) { 528 String[] pair = token.split("="); 529 if (pair.length != 2) { 530 throw new CoordinatorEngineException(ErrorCode.E0420, filter, 531 "elements must be name=value pairs"); 532 } 533 if (!FILTER_NAMES.contains(pair[0].toLowerCase())) { 534 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]", 535 pair[0])); 536 } 537 if (pair[0].equalsIgnoreCase("frequency")) { 538 isFrequencySpecified = true; 539 try { 540 frequency = (int) Float.parseFloat(pair[1]) + ""; 541 continue; 542 } 543 catch (NumberFormatException NANException) { 544 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( 545 "invalid value [{0}] for frequency. A numerical value is expected", pair[1])); 546 } 547 } 548 if (pair[0].equalsIgnoreCase("unit")) { 549 isTimeUnitSpecified = true; 550 timeUnit = pair[1]; 551 if (!timeUnit.equalsIgnoreCase("months") && !timeUnit.equalsIgnoreCase("days") 552 && !timeUnit.equalsIgnoreCase("hours") && !timeUnit.equalsIgnoreCase("minutes")) { 553 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( 554 "invalid value [{0}] for time unit. " 555 + "Valid value is one of months, days, hours or minutes", pair[1])); 556 } 557 continue; 558 } 559 if (pair[0].equals("status")) { 560 try { 561 CoordinatorJob.Status.valueOf(pair[1]); 562 } 563 catch (IllegalArgumentException ex) { 564 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( 565 "invalid status [{0}]", pair[1])); 566 } 567 } 568 List<String> list = map.get(pair[0]); 569 if (list == null) { 570 list = new ArrayList<String>(); 571 map.put(pair[0], list); 572 } 573 list.add(pair[1]); 574 } else { 575 throw new CoordinatorEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs"); 576 } 577 } 578 // Unit is specified and frequency is not specified 579 if (!isFrequencySpecified && isTimeUnitSpecified) { 580 throw new CoordinatorEngineException(ErrorCode.E0420, filter, "time unit should be added only when " 581 + "frequency is specified. Either specify frequency also or else remove the time unit"); 582 } else if (isFrequencySpecified) { 583 // Frequency value is specified 584 if (isTimeUnitSpecified) { 585 if (timeUnit.equalsIgnoreCase("months")) { 586 timeUnit = "MONTH"; 587 } else if (timeUnit.equalsIgnoreCase("days")) { 588 timeUnit = "DAY"; 589 } else if (timeUnit.equalsIgnoreCase("hours")) { 590 // When job details are persisted to database, frequency in hours are converted to minutes. 591 // This conversion is to conform with that. 592 frequency = Integer.parseInt(frequency) * 60 + ""; 593 timeUnit = "MINUTE"; 594 } else if (timeUnit.equalsIgnoreCase("minutes")) { 595 timeUnit = "MINUTE"; 596 } 597 } 598 // Adding the frequency and time unit filters to the filter map 599 List<String> list = new ArrayList<String>(); 600 list.add(timeUnit); 601 map.put("unit", list); 602 list = new ArrayList<String>(); 603 list.add(frequency); 604 map.put("frequency", list); 605 } 606 } 607 return map; 608 } 609 }