001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie; 019 020 import java.io.IOException; 021 import java.io.Writer; 022 import java.util.ArrayList; 023 import java.util.Date; 024 import java.util.HashMap; 025 import java.util.HashSet; 026 import java.util.Iterator; 027 import java.util.List; 028 import java.util.Map; 029 import java.util.Set; 030 import java.util.StringTokenizer; 031 import org.apache.hadoop.conf.Configuration; 032 import org.apache.oozie.client.CoordinatorAction; 033 import org.apache.oozie.client.CoordinatorJob; 034 import org.apache.oozie.client.OozieClient; 035 import org.apache.oozie.client.WorkflowJob; 036 import org.apache.oozie.client.rest.RestConstants; 037 import org.apache.oozie.command.CommandException; 038 import org.apache.oozie.command.coord.CoordActionInfoXCommand; 039 import org.apache.oozie.util.CoordActionsInDateRange; 040 import org.apache.oozie.command.coord.CoordChangeXCommand; 041 import org.apache.oozie.command.coord.CoordJobXCommand; 042 import org.apache.oozie.command.coord.CoordJobsXCommand; 043 import org.apache.oozie.command.coord.CoordKillXCommand; 044 import org.apache.oozie.command.coord.CoordRerunXCommand; 045 import org.apache.oozie.command.coord.CoordResumeXCommand; 046 import org.apache.oozie.command.coord.CoordSubmitXCommand; 047 import org.apache.oozie.command.coord.CoordSuspendXCommand; 048 import org.apache.oozie.service.DagXLogInfoService; 049 import org.apache.oozie.service.Services; 050 import org.apache.oozie.service.XLogService; 051 import org.apache.oozie.util.ParamChecker; 052 import org.apache.oozie.util.XLog; 053 import org.apache.oozie.util.XLogStreamer; 054 055 public class CoordinatorEngine extends BaseEngine { 056 private static XLog LOG = XLog.getLog(CoordinatorEngine.class); 057 058 /** 059 * Create a system Coordinator engine, with no user and no group. 060 */ 061 public CoordinatorEngine() { 062 if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) { 063 LOG.debug("Oozie CoordinatorEngine is not using XCommands."); 064 } 065 else { 066 LOG.debug("Oozie CoordinatorEngine is using XCommands."); 067 } 068 } 069 070 /** 071 * Create a Coordinator engine to perform operations on behave of a user. 072 * 073 * @param user user name. 074 * @param authToken the authentication token. 075 */ 076 public CoordinatorEngine(String user, String authToken) { 077 this(); 078 this.user = ParamChecker.notEmpty(user, "user"); 079 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 080 } 081 082 /* 083 * (non-Javadoc) 084 * 085 * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String) 086 */ 087 @Override 088 public String getDefinition(String jobId) throws BaseEngineException { 089 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); 090 return job.getOrigJobXml(); 091 } 092 093 /** 094 * @param jobId 095 * @return CoordinatorJobBean 096 * @throws BaseEngineException 097 */ 098 private CoordinatorJobBean getCoordJobWithNoActionInfo(String jobId) throws BaseEngineException { 099 try { 100 return new CoordJobXCommand(jobId).call(); 101 } 102 catch (CommandException ex) { 103 throw new BaseEngineException(ex); 104 } 105 } 106 107 /** 108 * @param actionId 109 * @return CoordinatorActionBean 110 * @throws BaseEngineException 111 */ 112 public CoordinatorActionBean getCoordAction(String actionId) throws BaseEngineException { 113 try { 114 return new CoordActionInfoXCommand(actionId).call(); 115 } 116 catch (CommandException ex) { 117 throw new BaseEngineException(ex); 118 } 119 } 120 121 /* 122 * (non-Javadoc) 123 * 124 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String) 125 */ 126 @Override 127 public CoordinatorJobBean getCoordJob(String jobId) throws BaseEngineException { 128 try { 129 return new CoordJobXCommand(jobId).call(); 130 } 131 catch (CommandException ex) { 132 throw new BaseEngineException(ex); 133 } 134 } 135 136 /* 137 * (non-Javadoc) 138 * 139 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, java.lang.String, int, int) 140 */ 141 @Override 142 public CoordinatorJobBean getCoordJob(String jobId, String filter, int start, int length) throws BaseEngineException { 143 List<String> filterList = parseStatusFilter(filter); 144 try { 145 return new CoordJobXCommand(jobId, filterList, start, length) 146 .call(); 147 } 148 catch (CommandException ex) { 149 throw new BaseEngineException(ex); 150 } 151 } 152 153 /* 154 * (non-Javadoc) 155 * 156 * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String) 157 */ 158 @Override 159 public String getJobIdForExternalId(String externalId) throws CoordinatorEngineException { 160 return null; 161 } 162 163 /* 164 * (non-Javadoc) 165 * 166 * @see org.apache.oozie.BaseEngine#kill(java.lang.String) 167 */ 168 @Override 169 public void kill(String jobId) throws CoordinatorEngineException { 170 try { 171 new CoordKillXCommand(jobId).call(); 172 LOG.info("User " + user + " killed the Coordinator job " + jobId); 173 } 174 catch (CommandException e) { 175 throw new CoordinatorEngineException(e); 176 } 177 } 178 179 /* (non-Javadoc) 180 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String) 181 */ 182 @Override 183 public void change(String jobId, String changeValue) throws CoordinatorEngineException { 184 try { 185 new CoordChangeXCommand(jobId, changeValue).call(); 186 LOG.info("User " + user + " changed the Coordinator job " + jobId + " to " + changeValue); 187 } 188 catch (CommandException e) { 189 throw new CoordinatorEngineException(e); 190 } 191 } 192 193 @Override 194 @Deprecated 195 public void reRun(String jobId, Configuration conf) throws BaseEngineException { 196 throw new BaseEngineException(new XException(ErrorCode.E0301)); 197 } 198 199 /** 200 * Rerun coordinator actions for given rerunType 201 * 202 * @param jobId 203 * @param rerunType 204 * @param scope 205 * @param refresh 206 * @param noCleanup 207 * @throws BaseEngineException 208 */ 209 public CoordinatorActionInfo reRun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) 210 throws BaseEngineException { 211 try { 212 return new CoordRerunXCommand(jobId, rerunType, scope, refresh, 213 noCleanup).call(); 214 } 215 catch (CommandException ex) { 216 throw new BaseEngineException(ex); 217 } 218 } 219 220 /* 221 * (non-Javadoc) 222 * 223 * @see org.apache.oozie.BaseEngine#resume(java.lang.String) 224 */ 225 @Override 226 public void resume(String jobId) throws CoordinatorEngineException { 227 try { 228 new CoordResumeXCommand(jobId).call(); 229 } 230 catch (CommandException e) { 231 throw new CoordinatorEngineException(e); 232 } 233 } 234 235 @Override 236 @Deprecated 237 public void start(String jobId) throws BaseEngineException { 238 throw new BaseEngineException(new XException(ErrorCode.E0301)); 239 } 240 241 /* 242 * (non-Javadoc) 243 * 244 * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String, 245 * java.io.Writer) 246 */ 247 @Override 248 public void streamLog(String jobId, Writer writer) throws IOException, BaseEngineException { 249 XLogStreamer.Filter filter = new XLogStreamer.Filter(); 250 filter.setParameter(DagXLogInfoService.JOB, jobId); 251 252 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); 253 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer); 254 } 255 256 /** 257 * Add list of actions to the filter based on conditions 258 * 259 * @param jobId Job Id 260 * @param logRetrievalScope Value for the retrieval type 261 * @param logRetrievalType Based on which filter criteria the log is retrieved 262 * @param writer writer to stream the log to 263 * @throws IOException 264 * @throws BaseEngineException 265 * @throws CommandException 266 */ 267 public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer) 268 throws IOException, BaseEngineException, CommandException { 269 XLogStreamer.Filter filter = new XLogStreamer.Filter(); 270 filter.setParameter(DagXLogInfoService.JOB, jobId); 271 if (logRetrievalScope != null && logRetrievalType != null) { 272 // if coordinator action logs are to be retrieved based on action id range 273 if (logRetrievalType.equals(RestConstants.JOB_LOG_ACTION)) { 274 Set<String> actions = new HashSet<String>(); 275 String[] list = logRetrievalScope.split(","); 276 for (String s : list) { 277 s = s.trim(); 278 if (s.contains("-")) { 279 String[] range = s.split("-"); 280 if (range.length != 2) { 281 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s 282 + "'"); 283 } 284 int start; 285 int end; 286 try { 287 start = Integer.parseInt(range[0].trim()); 288 end = Integer.parseInt(range[1].trim()); 289 if (start > end) { 290 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s 291 + "'"); 292 } 293 } 294 catch (NumberFormatException ne) { 295 throw new CommandException(ErrorCode.E0302, ne); 296 } 297 for (int i = start; i <= end; i++) { 298 actions.add(jobId + "@" + i); 299 } 300 } 301 else { 302 try { 303 Integer.parseInt(s); 304 } 305 catch (NumberFormatException ne) { 306 throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s 307 + "'. Integer only."); 308 } 309 actions.add(jobId + "@" + s); 310 } 311 } 312 313 Iterator<String> actionsIterator = actions.iterator(); 314 StringBuilder orSeparatedActions = new StringBuilder(""); 315 boolean orRequired = false; 316 while (actionsIterator.hasNext()) { 317 if (orRequired) { 318 orSeparatedActions.append("|"); 319 } 320 orSeparatedActions.append(actionsIterator.next().toString()); 321 orRequired = true; 322 } 323 if (actions.size() > 1 && orRequired) { 324 orSeparatedActions.insert(0, "("); 325 orSeparatedActions.append(")"); 326 } 327 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString()); 328 } 329 // if coordinator action logs are to be retrieved based on date range 330 // this block gets the corresponding list of coordinator actions to be used by the log filter 331 if (logRetrievalType.equalsIgnoreCase(RestConstants.JOB_LOG_DATE)) { 332 List<String> coordActionIdList = null; 333 try { 334 coordActionIdList = CoordActionsInDateRange.getCoordActionIdsFromDates(jobId, logRetrievalScope); 335 } 336 catch (XException xe) { 337 throw new CommandException(ErrorCode.E0302, "Error in date range for coordinator actions", xe); 338 } 339 StringBuilder orSeparatedActions = new StringBuilder(""); 340 boolean orRequired = false; 341 for (String coordActionId : coordActionIdList) { 342 if (orRequired) { 343 orSeparatedActions.append("|"); 344 } 345 orSeparatedActions.append(coordActionId); 346 orRequired = true; 347 } 348 if (coordActionIdList.size() > 1 && orRequired) { 349 orSeparatedActions.insert(0, "("); 350 orSeparatedActions.append(")"); 351 } 352 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString()); 353 } 354 } 355 CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId); 356 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer); 357 } 358 359 /* 360 * (non-Javadoc) 361 * 362 * @see 363 * org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration 364 * , boolean) 365 */ 366 @Override 367 public String submitJob(Configuration conf, boolean startJob) throws CoordinatorEngineException { 368 try { 369 CoordSubmitXCommand submit = new CoordSubmitXCommand(conf, 370 getAuthToken()); 371 return submit.call(); 372 } 373 catch (CommandException ex) { 374 throw new CoordinatorEngineException(ex); 375 } 376 } 377 378 /* 379 * (non-Javadoc) 380 * 381 * @see 382 * org.apache.oozie.BaseEngine#dryrunSubmit(org.apache.hadoop.conf.Configuration 383 * , boolean) 384 */ 385 @Override 386 public String dryrunSubmit(Configuration conf, boolean startJob) throws CoordinatorEngineException { 387 try { 388 CoordSubmitXCommand submit = new CoordSubmitXCommand(true, conf, 389 getAuthToken()); 390 return submit.call(); 391 } 392 catch (CommandException ex) { 393 throw new CoordinatorEngineException(ex); 394 } 395 } 396 397 /* 398 * (non-Javadoc) 399 * 400 * @see org.apache.oozie.BaseEngine#suspend(java.lang.String) 401 */ 402 @Override 403 public void suspend(String jobId) throws CoordinatorEngineException { 404 try { 405 new CoordSuspendXCommand(jobId).call(); 406 } 407 catch (CommandException e) { 408 throw new CoordinatorEngineException(e); 409 } 410 411 } 412 413 /* 414 * (non-Javadoc) 415 * 416 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String) 417 */ 418 @Override 419 public WorkflowJob getJob(String jobId) throws BaseEngineException { 420 throw new BaseEngineException(new XException(ErrorCode.E0301)); 421 } 422 423 /* 424 * (non-Javadoc) 425 * 426 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int) 427 */ 428 @Override 429 public WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException { 430 throw new BaseEngineException(new XException(ErrorCode.E0301)); 431 } 432 433 private static final Set<String> FILTER_NAMES = new HashSet<String>(); 434 435 static { 436 FILTER_NAMES.add(OozieClient.FILTER_USER); 437 FILTER_NAMES.add(OozieClient.FILTER_NAME); 438 FILTER_NAMES.add(OozieClient.FILTER_GROUP); 439 FILTER_NAMES.add(OozieClient.FILTER_STATUS); 440 FILTER_NAMES.add(OozieClient.FILTER_ID); 441 FILTER_NAMES.add(OozieClient.FILTER_FREQUENCY); 442 FILTER_NAMES.add(OozieClient.FILTER_UNIT); 443 } 444 445 /** 446 * @param filter 447 * @param start 448 * @param len 449 * @return CoordinatorJobInfo 450 * @throws CoordinatorEngineException 451 */ 452 public CoordinatorJobInfo getCoordJobs(String filter, int start, int len) throws CoordinatorEngineException { 453 Map<String, List<String>> filterList = parseFilter(filter); 454 455 try { 456 return new CoordJobsXCommand(filterList, start, len).call(); 457 } 458 catch (CommandException ex) { 459 throw new CoordinatorEngineException(ex); 460 } 461 } 462 463 464 // Parses the filter string (e.g status=RUNNING;status=WAITING) and returns a list of status values 465 private List<String> parseStatusFilter(String filter) throws CoordinatorEngineException { 466 List<String> filterList = new ArrayList<String>(); 467 if (filter != null) { 468 //split name;value pairs 469 StringTokenizer st = new StringTokenizer(filter, ";"); 470 while (st.hasMoreTokens()) { 471 String token = st.nextToken(); 472 if (token.contains("=")) { 473 String[] pair = token.split("="); 474 if (pair.length != 2) { 475 throw new CoordinatorEngineException(ErrorCode.E0421, token, 476 "elements must be name=value pairs"); 477 } 478 if (pair[0].equalsIgnoreCase("status")) { 479 String statusValue = pair[1]; 480 try { 481 CoordinatorAction.Status.valueOf(statusValue); 482 } catch (IllegalArgumentException ex) { 483 StringBuilder validStatusList = new StringBuilder(); 484 for (CoordinatorAction.Status status: CoordinatorAction.Status.values()){ 485 validStatusList.append(status.toString()+" "); 486 } 487 // Check for incorrect status value 488 throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format( 489 "invalid status value [{0}]." + " Valid status values are: [{1}]", statusValue, validStatusList)); 490 } 491 filterList.add(statusValue); 492 } else { 493 // Check for incorrect filter option 494 throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format( 495 "invalid filter [{0}]." + " The only valid filter is \"status\"", pair[0])); 496 } 497 } else { 498 throw new CoordinatorEngineException(ErrorCode.E0421, token, 499 "elements must be name=value pairs"); 500 } 501 } 502 } 503 return filterList; 504 } 505 506 /** 507 * @param filter 508 * @return Map<String, List<String>> 509 * @throws CoordinatorEngineException 510 */ 511 private Map<String, List<String>> parseFilter(String filter) throws CoordinatorEngineException { 512 Map<String, List<String>> map = new HashMap<String, List<String>>(); 513 boolean isTimeUnitSpecified = false; 514 String timeUnit = "MINUTE"; 515 boolean isFrequencySpecified = false; 516 String frequency = ""; 517 if (filter != null) { 518 StringTokenizer st = new StringTokenizer(filter, ";"); 519 while (st.hasMoreTokens()) { 520 String token = st.nextToken(); 521 if (token.contains("=")) { 522 String[] pair = token.split("="); 523 if (pair.length != 2) { 524 throw new CoordinatorEngineException(ErrorCode.E0420, filter, 525 "elements must be name=value pairs"); 526 } 527 if (!FILTER_NAMES.contains(pair[0].toLowerCase())) { 528 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]", 529 pair[0])); 530 } 531 if (pair[0].equalsIgnoreCase("frequency")) { 532 isFrequencySpecified = true; 533 try { 534 frequency = (int) Float.parseFloat(pair[1]) + ""; 535 continue; 536 } 537 catch (NumberFormatException NANException) { 538 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( 539 "invalid value [{0}] for frequency. A numerical value is expected", pair[1])); 540 } 541 } 542 if (pair[0].equalsIgnoreCase("unit")) { 543 isTimeUnitSpecified = true; 544 timeUnit = pair[1]; 545 if (!timeUnit.equalsIgnoreCase("months") && !timeUnit.equalsIgnoreCase("days") 546 && !timeUnit.equalsIgnoreCase("hours") && !timeUnit.equalsIgnoreCase("minutes")) { 547 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( 548 "invalid value [{0}] for time unit. " 549 + "Valid value is one of months, days, hours or minutes", pair[1])); 550 } 551 continue; 552 } 553 if (pair[0].equals("status")) { 554 try { 555 CoordinatorJob.Status.valueOf(pair[1]); 556 } 557 catch (IllegalArgumentException ex) { 558 throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format( 559 "invalid status [{0}]", pair[1])); 560 } 561 } 562 List<String> list = map.get(pair[0]); 563 if (list == null) { 564 list = new ArrayList<String>(); 565 map.put(pair[0], list); 566 } 567 list.add(pair[1]); 568 } else { 569 throw new CoordinatorEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs"); 570 } 571 } 572 // Unit is specified and frequency is not specified 573 if (!isFrequencySpecified && isTimeUnitSpecified) { 574 throw new CoordinatorEngineException(ErrorCode.E0420, filter, "time unit should be added only when " 575 + "frequency is specified. Either specify frequency also or else remove the time unit"); 576 } else if (isFrequencySpecified) { 577 // Frequency value is specified 578 if (isTimeUnitSpecified) { 579 if (timeUnit.equalsIgnoreCase("months")) { 580 timeUnit = "MONTH"; 581 } else if (timeUnit.equalsIgnoreCase("days")) { 582 timeUnit = "DAY"; 583 } else if (timeUnit.equalsIgnoreCase("hours")) { 584 // When job details are persisted to database, frequency in hours are converted to minutes. 585 // This conversion is to conform with that. 586 frequency = Integer.parseInt(frequency) * 60 + ""; 587 timeUnit = "MINUTE"; 588 } else if (timeUnit.equalsIgnoreCase("minutes")) { 589 timeUnit = "MINUTE"; 590 } 591 } 592 // Adding the frequency and time unit filters to the filter map 593 List<String> list = new ArrayList<String>(); 594 list.add(timeUnit); 595 map.put("unit", list); 596 list = new ArrayList<String>(); 597 list.add(frequency); 598 map.put("frequency", list); 599 } 600 } 601 return map; 602 } 603 }