001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie; 019 020 import java.io.IOException; 021 import java.io.Writer; 022 import java.util.ArrayList; 023 import java.util.Date; 024 import java.util.HashMap; 025 import java.util.HashSet; 026 import java.util.Iterator; 027 import java.util.List; 028 import java.util.Map; 029 import java.util.Set; 030 import java.util.StringTokenizer; 031 import org.apache.hadoop.conf.Configuration; 032 import org.apache.oozie.client.CoordinatorAction; 033 import org.apache.oozie.client.CoordinatorJob; 034 import org.apache.oozie.client.OozieClient; 035 import org.apache.oozie.client.WorkflowJob; 036 import org.apache.oozie.client.rest.RestConstants; 037 import org.apache.oozie.command.CommandException; 038 import org.apache.oozie.command.coord.CoordActionInfoXCommand; 039 import org.apache.oozie.util.CoordActionsInDateRange; 040 import org.apache.oozie.command.coord.CoordChangeXCommand; 041 import org.apache.oozie.command.coord.CoordJobXCommand; 042 import org.apache.oozie.command.coord.CoordJobsXCommand; 043 import org.apache.oozie.command.coord.CoordKillXCommand; 044 import org.apache.oozie.command.coord.CoordRerunXCommand; 045 import org.apache.oozie.command.coord.CoordResumeXCommand; 046 import org.apache.oozie.command.coord.CoordSubmitXCommand; 047 import org.apache.oozie.command.coord.CoordSuspendXCommand; 048 import org.apache.oozie.service.DagXLogInfoService; 049 import org.apache.oozie.service.Services; 050 import org.apache.oozie.service.XLogService; 051 import org.apache.oozie.util.ParamChecker; 052 import org.apache.oozie.util.XLog; 053 import org.apache.oozie.util.XLogStreamer; 054 055 public class CoordinatorEngine extends BaseEngine { 056 private static XLog LOG = XLog.getLog(CoordinatorEngine.class); 057 058 /** 059 * Create a system Coordinator engine, with no user and no group. 060 */ 061 public CoordinatorEngine() { 062 if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) { 063 LOG.debug("Oozie CoordinatorEngine is not using XCommands."); 064 } 065 else { 066 LOG.debug("Oozie CoordinatorEngine is using XCommands."); 067 } 068 } 069 070 /** 071 * Create a Coordinator engine to perform operations on behave of a user. 072 * 073 * @param user user name. 074 * @param authToken the authentication token. 075 */ 076 public CoordinatorEngine(String user, String authToken) { 077 this(); 078 this.user = ParamChecker.notEmpty(user, "user"); 079 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 080 } 081 082 /* 083 * (non-Javadoc) 084 * 085 * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String) 086 */ 087 @Override 088 public String getDefinition(String jobId) throws BaseEngineException { 089 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); 090 return job.getOrigJobXml(); 091 } 092 093 /** 094 * @param jobId 095 * @return CoordinatorJobBean 096 * @throws BaseEngineException 097 */ 098 private CoordinatorJobBean getCoordJobWithNoActionInfo(String jobId) throws BaseEngineException { 099 try { 100 return new CoordJobXCommand(jobId).call(); 101 } 102 catch (CommandException ex) { 103 throw new BaseEngineException(ex); 104 } 105 } 106 107 /** 108 * @param actionId 109 * @return CoordinatorActionBean 110 * @throws BaseEngineException 111 */ 112 public CoordinatorActionBean getCoordAction(String actionId) throws BaseEngineException { 113 try { 114 return new CoordActionInfoXCommand(actionId).call(); 115 } 116 catch (CommandException ex) { 117 throw new BaseEngineException(ex); 118 } 119 } 120 121 /* 122 * (non-Javadoc) 123 * 124 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String) 125 */ 126 @Override 127 public CoordinatorJobBean getCoordJob(String jobId) throws BaseEngineException { 128 try { 129 return new CoordJobXCommand(jobId).call(); 130 } 131 catch (CommandException ex) { 132 throw new BaseEngineException(ex); 133 } 134 } 135 136 /* 137 * (non-Javadoc) 138 * 139 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, java.lang.String, int, int) 140 */ 141 @Override 142 public CoordinatorJobBean getCoordJob(String jobId, String filter, int start, int length) throws BaseEngineException { 143 List<String> filterList = parseStatusFilter(filter); 144 try { 145 return new CoordJobXCommand(jobId, filterList, start, length) 146 .call(); 147 } 148 catch (CommandException ex) { 149 throw new BaseEngineException(ex); 150 } 151 } 152 153 /* 154 * (non-Javadoc) 155 * 156 * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String) 157 */ 158 @Override 159 public String getJobIdForExternalId(String externalId) throws CoordinatorEngineException { 160 return null; 161 } 162 163 /* 164 * (non-Javadoc) 165 * 166 * @see org.apache.oozie.BaseEngine#kill(java.lang.String) 167 */ 168 @Override 169 public void kill(String jobId) throws CoordinatorEngineException { 170 try { 171 new CoordKillXCommand(jobId).call(); 172 LOG.info("User " + user + " killed the Coordinator job " + jobId); 173 } 174 catch (CommandException e) { 175 throw new CoordinatorEngineException(e); 176 } 177 } 178 179 /* (non-Javadoc) 180 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String) 181 */ 182 @Override 183 public void change(String jobId, String changeValue) throws CoordinatorEngineException { 184 try { 185 new CoordChangeXCommand(jobId, changeValue).call(); 186 LOG.info("User " + user + " changed the Coordinator job " + jobId + " to " + changeValue); 187 } 188 catch (CommandException e) { 189 throw new CoordinatorEngineException(e); 190 } 191 } 192 193 @Override 194 @Deprecated 195 public void reRun(String jobId, Configuration conf) throws BaseEngineException { 196 throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of rerun")); 197 } 198 199 /** 200 * Rerun coordinator actions for given rerunType 201 * 202 * @param jobId 203 * @param rerunType 204 * @param scope 205 * @param refresh 206 * @param noCleanup 207 * @throws BaseEngineException 208 */ 209 public CoordinatorActionInfo reRun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) 210 throws BaseEngineException { 211 try { 212 return new CoordRerunXCommand(jobId, rerunType, scope, refresh, 213 noCleanup).call(); 214 } 215 catch (CommandException ex) { 216 throw new BaseEngineException(ex); 217 } 218 } 219 220 /* 221 * (non-Javadoc) 222 * 223 * @see org.apache.oozie.BaseEngine#resume(java.lang.String) 224 */ 225 @Override 226 public void resume(String jobId) throws CoordinatorEngineException { 227 try { 228 new CoordResumeXCommand(jobId).call(); 229 } 230 catch (CommandException e) { 231 throw new CoordinatorEngineException(e); 232 } 233 } 234 235 @Override 236 @Deprecated 237 public void start(String jobId) throws BaseEngineException { 238 throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of start")); 239 } 240 241 /* 242 * (non-Javadoc) 243 * 244 * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String, 245 * java.io.Writer) 246 */ 247 @Override 248 public void streamLog(String jobId, Writer writer) throws IOException, BaseEngineException { 249 XLogStreamer.Filter filter = new XLogStreamer.Filter(); 250 filter.setParameter(DagXLogInfoService.JOB, jobId); 251 252 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); 253 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer); 254 } 255 256 /** 257 * Add list of actions to the filter based on conditions 258 * 259 * @param jobId Job Id 260 * @param logRetrievalScope Value for the retrieval type 261 * @param logRetrievalType Based on which filter criteria the log is retrieved 262 * @param writer writer to stream the log to 263 * @throws IOException 264 * @throws BaseEngineException 265 * @throws CommandException 266 */ 267 public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer) 268 throws IOException, BaseEngineException, CommandException { 269 XLogStreamer.Filter filter = new XLogStreamer.Filter(); 270 filter.setParameter(DagXLogInfoService.JOB, jobId); 271 if (logRetrievalScope != null && logRetrievalType != null) { 272 // if coordinator action logs are to be retrieved based on action id range 273 if (logRetrievalType.equals(RestConstants.JOB_LOG_ACTION)) { 274 Set<String> actions = new HashSet<String>(); 275 String[] list = logRetrievalScope.split(","); 276 for (String s : list) { 277 s = s.trim(); 278 if (s.contains("-")) { 279 String[] range = s.split("-"); 280 if (range.length != 2) { 281 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s 282 + "'"); 283 } 284 int start; 285 int end; 286 try { 287 start = Integer.parseInt(range[0].trim()); 288 } catch (NumberFormatException ne) { 289 throw new CommandException(ErrorCode.E0302, "could not parse " + range[0].trim() + "into an integer", 290 ne); 291 } 292 try { 293 end = Integer.parseInt(range[1].trim()); 294 } catch (NumberFormatException ne) { 295 throw new CommandException(ErrorCode.E0302, "could not parse " + range[1].trim() + "into an integer", 296 ne); 297 } 298 if (start > end) { 299 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'"); 300 } 301 for (int i = start; i <= end; i++) { 302 actions.add(jobId + "@" + i); 303 } 304 } 305 else { 306 try { 307 Integer.parseInt(s); 308 } 309 catch (NumberFormatException ne) { 310 throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s 311 + "'. Integer only."); 312 } 313 actions.add(jobId + "@" + s); 314 } 315 } 316 317 Iterator<String> actionsIterator = actions.iterator(); 318 StringBuilder orSeparatedActions = new StringBuilder(""); 319 boolean orRequired = false; 320 while (actionsIterator.hasNext()) { 321 if (orRequired) { 322 orSeparatedActions.append("|"); 323 } 324 orSeparatedActions.append(actionsIterator.next().toString()); 325 orRequired = true; 326 } 327 if (actions.size() > 1 && orRequired) { 328 orSeparatedActions.insert(0, "("); 329 orSeparatedActions.append(")"); 330 } 331 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString()); 332 } 333 // if coordinator action logs are to be retrieved based on date range 334 // this block gets the corresponding list of coordinator actions to be used by the log filter 335 if (logRetrievalType.equalsIgnoreCase(RestConstants.JOB_LOG_DATE)) { 336 List<String> coordActionIdList = null; 337 try { 338 coordActionIdList = CoordActionsInDateRange.getCoordActionIdsFromDates(jobId, logRetrievalScope); 339 } 340 catch (XException xe) { 341 throw new CommandException(ErrorCode.E0302, "Error in date range for coordinator actions", xe); 342 } 343 StringBuilder orSeparatedActions = new StringBuilder(""); 344 boolean orRequired = false; 345 for (String coordActionId : coordActionIdList) { 346 if (orRequired) { 347 orSeparatedActions.append("|"); 348 } 349 orSeparatedActions.append(coordActionId); 350 orRequired = true; 351 } 352 if (coordActionIdList.size() > 1 && orRequired) { 353 orSeparatedActions.insert(0, "("); 354 orSeparatedActions.append(")"); 355 } 356 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString()); 357 } 358 } 359 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); 360 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer); 361 } 362 363 /* 364 * (non-Javadoc) 365 * 366 * @see 367 * org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration 368 * , boolean) 369 */ 370 @Override 371 public String submitJob(Configuration conf, boolean startJob) throws CoordinatorEngineException { 372 try { 373 CoordSubmitXCommand submit = new CoordSubmitXCommand(conf, 374 getAuthToken()); 375 return submit.call(); 376 } 377 catch (CommandException ex) { 378 throw new CoordinatorEngineException(ex); 379 } 380 } 381 382 /* 383 * (non-Javadoc) 384 * 385 * @see 386 * org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration) 387 */ 388 @Override 389 public String dryRunSubmit(Configuration conf) throws CoordinatorEngineException { 390 try { 391 CoordSubmitXCommand submit = new CoordSubmitXCommand(true, conf, 392 getAuthToken()); 393 return submit.call(); 394 } 395 catch (CommandException ex) { 396 throw new CoordinatorEngineException(ex); 397 } 398 } 399 400 /* 401 * (non-Javadoc) 402 * 403 * @see org.apache.oozie.BaseEngine#suspend(java.lang.String) 404 */ 405 @Override 406 public void suspend(String jobId) throws CoordinatorEngineException { 407 try { 408 new CoordSuspendXCommand(jobId).call(); 409 } 410 catch (CommandException e) { 411 throw new CoordinatorEngineException(e); 412 } 413 414 } 415 416 /* 417 * (non-Javadoc) 418 * 419 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String) 420 */ 421 @Override 422 public WorkflowJob getJob(String jobId) throws BaseEngineException { 423 throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine")); 424 } 425 426 /* 427 * (non-Javadoc) 428 * 429 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int) 430 */ 431 @Override 432 public WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException { 433 throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine")); 434 } 435 436 private static final Set<String> FILTER_NAMES = new HashSet<String>(); 437 438 static { 439 FILTER_NAMES.add(OozieClient.FILTER_USER); 440 FILTER_NAMES.add(OozieClient.FILTER_NAME); 441 FILTER_NAMES.add(OozieClient.FILTER_GROUP); 442 FILTER_NAMES.add(OozieClient.FILTER_STATUS); 443 FILTER_NAMES.add(OozieClient.FILTER_ID); 444 FILTER_NAMES.add(OozieClient.FILTER_FREQUENCY); 445 FILTER_NAMES.add(OozieClient.FILTER_UNIT); 446 } 447 448 /** 449 * @param filter 450 * @param start 451 * @param len 452 * @return CoordinatorJobInfo 453 * @throws CoordinatorEngineException 454 */ 455 public CoordinatorJobInfo getCoordJobs(String filter, int start, int len) throws CoordinatorEngineException { 456 Map<String, List<String>> filterList = parseFilter(filter); 457 458 try { 459 return new CoordJobsXCommand(filterList, start, len).call(); 460 } 461 catch (CommandException ex) { 462 throw new CoordinatorEngineException(ex); 463 } 464 } 465 466 467 // Parses the filter string (e.g status=RUNNING;status=WAITING) and returns a list of status values 468 private List<String> parseStatusFilter(String filter) throws CoordinatorEngineException { 469 List<String> filterList = new ArrayList<String>(); 470 if (filter != null) { 471 //split name;value pairs 472 StringTokenizer st = new StringTokenizer(filter, ";"); 473 while (st.hasMoreTokens()) { 474 String token = st.nextToken(); 475 if (token.contains("=")) { 476 String[] pair = token.split("="); 477 if (pair.length != 2) { 478 throw new CoordinatorEngineException(ErrorCode.E0421, token, 479 "elements must be name=value pairs"); 480 } 481 if (pair[0].equalsIgnoreCase("status")) { 482 String statusValue = pair[1]; 483 try { 484 CoordinatorAction.Status.valueOf(statusValue); 485 } catch (IllegalArgumentException ex) { 486 StringBuilder validStatusList = new StringBuilder(); 487 for (CoordinatorAction.Status status: CoordinatorAction.Status.values()){ 488 validStatusList.append(status.toString()+" "); 489 } 490 // Check for incorrect status value 491 throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format( 492 "invalid status value [{0}]." + " Valid status values are: [{1}]", statusValue, validStatusList)); 493 } 494 filterList.add(statusValue); 495 } else { 496 // Check for incorrect filter option 497 throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format( 498 "invalid filter [{0}]." + " The only valid filter is \"status\"", pair[0])); 499 } 500 } else { 501 throw new CoordinatorEngineException(ErrorCode.E0421, token, 502 "elements must be name=value pairs"); 503 } 504 } 505 } 506 return filterList; 507 } 508 509 /** 510 * @param filter 511 * @return Map<String, List<String>> 512 * @throws CoordinatorEngineException 513 */ 514 private Map<String, List<String>> parseFilter(String filter) throws CoordinatorEngineException { 515 Map<String, List<String>> map = new HashMap<String, List<String>>(); 516 boolean isTimeUnitSpecified = false; 517 String timeUnit = "MINUTE"; 518 boolean isFrequencySpecified = false; 519 String frequency = ""; 520 if (filter != null) { 521 StringTokenizer st = new StringTokenizer(filter, ";"); 522 while (st.hasMoreTokens()) { 523 String token = st.nextToken(); 524 if (token.contains("=")) { 525 String[] pair = token.split("="); 526 if (pair.length != 2) { 527 throw new CoordinatorEngineException(ErrorCode.E0420, filter, 528 "elements must be name=value pairs"); 529 } 530 if (!FILTER_NAMES.contains(pair[0].toLowerCase())) { 531 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]", 532 pair[0])); 533 } 534 if (pair[0].equalsIgnoreCase("frequency")) { 535 isFrequencySpecified = true; 536 try { 537 frequency = (int) Float.parseFloat(pair[1]) + ""; 538 continue; 539 } 540 catch (NumberFormatException NANException) { 541 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( 542 "invalid value [{0}] for frequency. A numerical value is expected", pair[1])); 543 } 544 } 545 if (pair[0].equalsIgnoreCase("unit")) { 546 isTimeUnitSpecified = true; 547 timeUnit = pair[1]; 548 if (!timeUnit.equalsIgnoreCase("months") && !timeUnit.equalsIgnoreCase("days") 549 && !timeUnit.equalsIgnoreCase("hours") && !timeUnit.equalsIgnoreCase("minutes")) { 550 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( 551 "invalid value [{0}] for time unit. " 552 + "Valid value is one of months, days, hours or minutes", pair[1])); 553 } 554 continue; 555 } 556 if (pair[0].equals("status")) { 557 try { 558 CoordinatorJob.Status.valueOf(pair[1]); 559 } 560 catch (IllegalArgumentException ex) { 561 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( 562 "invalid status [{0}]", pair[1])); 563 } 564 } 565 List<String> list = map.get(pair[0]); 566 if (list == null) { 567 list = new ArrayList<String>(); 568 map.put(pair[0], list); 569 } 570 list.add(pair[1]); 571 } else { 572 throw new CoordinatorEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs"); 573 } 574 } 575 // Unit is specified and frequency is not specified 576 if (!isFrequencySpecified && isTimeUnitSpecified) { 577 throw new CoordinatorEngineException(ErrorCode.E0420, filter, "time unit should be added only when " 578 + "frequency is specified. Either specify frequency also or else remove the time unit"); 579 } else if (isFrequencySpecified) { 580 // Frequency value is specified 581 if (isTimeUnitSpecified) { 582 if (timeUnit.equalsIgnoreCase("months")) { 583 timeUnit = "MONTH"; 584 } else if (timeUnit.equalsIgnoreCase("days")) { 585 timeUnit = "DAY"; 586 } else if (timeUnit.equalsIgnoreCase("hours")) { 587 // When job details are persisted to database, frequency in hours are converted to minutes. 588 // This conversion is to conform with that. 589 frequency = Integer.parseInt(frequency) * 60 + ""; 590 timeUnit = "MINUTE"; 591 } else if (timeUnit.equalsIgnoreCase("minutes")) { 592 timeUnit = "MINUTE"; 593 } 594 } 595 // Adding the frequency and time unit filters to the filter map 596 List<String> list = new ArrayList<String>(); 597 list.add(timeUnit); 598 map.put("unit", list); 599 list = new ArrayList<String>(); 600 list.add(frequency); 601 map.put("frequency", list); 602 } 603 } 604 return map; 605 } 606 }