001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command; 019 020 import java.util.ArrayList; 021 import java.util.List; 022 import java.util.UUID; 023 024 import org.apache.oozie.CoordinatorActionBean; 025 import org.apache.oozie.CoordinatorJobBean; 026 import org.apache.oozie.ErrorCode; 027 import org.apache.oozie.FaultInjection; 028 import org.apache.oozie.WorkflowActionBean; 029 import org.apache.oozie.WorkflowJobBean; 030 import org.apache.oozie.XException; 031 import org.apache.oozie.service.CallableQueueService; 032 import org.apache.oozie.service.DagXLogInfoService; 033 import org.apache.oozie.service.InstrumentationService; 034 import org.apache.oozie.service.MemoryLocksService; 035 import org.apache.oozie.service.Services; 036 import org.apache.oozie.service.StoreService; 037 import org.apache.oozie.service.XLogService; 038 import org.apache.oozie.store.Store; 039 import org.apache.oozie.store.StoreException; 040 import org.apache.oozie.store.WorkflowStore; 041 import org.apache.oozie.util.Instrumentation; 042 import org.apache.oozie.util.ParamChecker; 043 import org.apache.oozie.util.XCallable; 044 import org.apache.oozie.util.XLog; 045 import org.apache.oozie.util.MemoryLocks.LockToken; 046 047 /** 048 * Base class for all synchronous and asynchronous DagEngine commands. 049 */ 050 public abstract class Command<T, S extends Store> implements XCallable<T> { 051 /** 052 * The instrumentation group used for Commands. 053 */ 054 private static final String INSTRUMENTATION_GROUP = "commands"; 055 056 private final long createdTime; 057 058 /** 059 * The instrumentation group used for Jobs. 060 */ 061 private static final String INSTRUMENTATION_JOB_GROUP = "jobs"; 062 063 private static final long LOCK_TIMEOUT = 1000; 064 protected static final long LOCK_FAILURE_REQUEUE_INTERVAL = 30000; 065 066 protected Instrumentation instrumentation; 067 private List<XCallable<Void>> callables; 068 private List<XCallable<Void>> delayedCallables; 069 private long delay = 0; 070 private List<XCallable<Void>> exceptionCallables; 071 private String name; 072 private String type; 073 private String key; 074 private int priority; 075 private int logMask; 076 private boolean withStore; 077 protected boolean dryrun = false; 078 private ArrayList<LockToken> locks = null; 079 080 /** 081 * This variable is package private for testing purposes only. 082 */ 083 XLog.Info logInfo; 084 085 /** 086 * Create a command that uses a {@link WorkflowStore} instance. <p/> The current {@link XLog.Info} values are 087 * captured for execution. 088 * 089 * @param name command name. 090 * @param type command type. 091 * @param priority priority of the command, used when queuing for asynchronous execution. 092 * @param logMask log mask for the command logging calls. 093 */ 094 public Command(String name, String type, int priority, int logMask) { 095 this(name, type, priority, logMask, true); 096 } 097 098 /** 099 * Create a command. <p/> The current {@link XLog.Info} values are captured for execution. 100 * 101 * @param name command name. 102 * @param type command type. 103 * @param priority priority of the command, used when queuing for asynchronous execution. 104 * @param logMask log mask for the command logging calls. 105 * @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not. 106 */ 107 public Command(String name, String type, int priority, int logMask, boolean withStore) { 108 this.name = ParamChecker.notEmpty(name, "name"); 109 this.type = ParamChecker.notEmpty(type, "type"); 110 this.key = name + "_" + UUID.randomUUID(); 111 this.priority = priority; 112 this.withStore = withStore; 113 this.logMask = logMask; 114 instrumentation = Services.get().get(InstrumentationService.class).get(); 115 logInfo = new XLog.Info(XLog.Info.get()); 116 createdTime = System.currentTimeMillis(); 117 locks = new ArrayList<LockToken>(); 118 } 119 120 /** 121 * Create a command. <p/> The current {@link XLog.Info} values are captured for execution. 122 * 123 * @param name command name. 124 * @param type command type. 125 * @param priority priority of the command, used when queuing for asynchronous execution. 126 * @param logMask log mask for the command logging calls. 127 * @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not. 128 * @param dryrun indicates if dryrun option is enabled. if enabled coordinator will show a diagnostic output without 129 * really submitting the job 130 */ 131 public Command(String name, String type, int priority, int logMask, boolean withStore, boolean dryrun) { 132 this(name, type, priority, logMask, withStore); 133 this.dryrun = dryrun; 134 } 135 136 /** 137 * Return the name of the command. 138 * 139 * @return the name of the command. 140 */ 141 @Override 142 public String getName() { 143 return name; 144 } 145 146 /** 147 * Return the callable type. <p/> The callable type is used for concurrency throttling in the {@link 148 * org.apache.oozie.service.CallableQueueService}. 149 * 150 * @return the callable type. 151 */ 152 @Override 153 public String getType() { 154 return type; 155 } 156 157 /** 158 * Return the priority of the command. 159 * 160 * @return the priority of the command. 161 */ 162 @Override 163 public int getPriority() { 164 return priority; 165 } 166 167 /** 168 * Returns the createdTime of the callable in milliseconds 169 * 170 * @return the callable createdTime 171 */ 172 @Override 173 public long getCreatedTime() { 174 return createdTime; 175 } 176 177 /** 178 * Execute the command {@link #call(WorkflowStore)} setting all the necessary context. <p/> The {@link XLog.Info} is 179 * set to the values at instance creation time. <p/> The command execution is logged and instrumented. <p/> If a 180 * {@link WorkflowStore} is used, a fresh instance will be passed and it will be commited after the {@link 181 * #call(WorkflowStore)} execution. It will be closed without committing if an exception is thrown. <p/> Commands 182 * queued via the DagCommand queue methods are queued for execution after the workflow store has been committed. 183 * <p/> If an exception happends the queued commands will not be effectively queued for execution. Instead, the the 184 * commands queued for exception will be effectively queued fro execution.. 185 * 186 * @throws CommandException thrown if the command could not be executed successfully, the workflow store is closed 187 * without committing, thus doing a rollback. 188 */ 189 @SuppressWarnings({"ThrowFromFinallyBlock", "unchecked"}) 190 public final T call() throws CommandException { 191 XLog.Info.get().setParameters(logInfo); 192 XLog log = XLog.getLog(getClass()); 193 log.trace(logMask, "Start"); 194 Instrumentation.Cron cron = new Instrumentation.Cron(); 195 cron.start(); 196 callables = new ArrayList<XCallable<Void>>(); 197 delayedCallables = new ArrayList<XCallable<Void>>(); 198 exceptionCallables = new ArrayList<XCallable<Void>>(); 199 delay = 0; 200 S store = null; 201 boolean exception = false; 202 203 try { 204 if (withStore) { 205 store = (S) Services.get().get(StoreService.class).getStore(getStoreClass()); 206 store.beginTrx(); 207 } 208 T result = execute(store); 209 /* 210 * 211 * if (store != null && log != null) { log.info(XLog.STD, 212 * "connection log from store Flush Mode {0} ", 213 * store.getFlushMode()); } 214 */ 215 if (withStore) { 216 if (store == null) { 217 throw new IllegalStateException("WorkflowStore should not be null"); 218 } 219 if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) { 220 throw new RuntimeException("Skipping Commit for Failover Testing"); 221 } 222 store.commitTrx(); 223 } 224 225 // TODO figure out the reject due to concurrency problems and remove 226 // the delayed queuing for callables. 227 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables, 10); 228 if (ret == false) { 229 logQueueCallableFalse(callables); 230 } 231 232 ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, delay); 233 if (ret == false) { 234 logQueueCallableFalse(delayedCallables); 235 } 236 237 return result; 238 } 239 catch (XException ex) { 240 log.error(logMask | XLog.OPS, "XException, {0}", ex.getMessage(), ex); 241 if (store != null) { 242 log.info(XLog.STD, "XException - connection logs from store {0}, {1}", store.getConnection(), store 243 .isClosed()); 244 } 245 exception = true; 246 if (store != null && store.isActive()) { 247 try { 248 store.rollbackTrx(); 249 } 250 catch (RuntimeException rex) { 251 log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex); 252 } 253 } 254 255 // TODO figure out the reject due to concurrency problems and remove 256 // the delayed queuing for callables. 257 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(exceptionCallables, 10); 258 if (ret == false) { 259 logQueueCallableFalse(exceptionCallables); 260 } 261 if (ex instanceof CommandException) { 262 throw (CommandException) ex; 263 } 264 else { 265 throw new CommandException(ex); 266 } 267 } 268 catch (Exception ex) { 269 log.error(logMask | XLog.OPS, "Exception, {0}", ex.getMessage(), ex); 270 exception = true; 271 if (store != null && store.isActive()) { 272 try { 273 store.rollbackTrx(); 274 } 275 catch (RuntimeException rex) { 276 log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex); 277 } 278 } 279 throw new CommandException(ErrorCode.E0607, name, ex.getMessage(), ex); 280 } 281 catch (Error er) { 282 log.error(logMask | XLog.OPS, "Error, {0}", er.getMessage(), er); 283 exception = true; 284 if (store != null && store.isActive()) { 285 try { 286 store.rollbackTrx(); 287 } 288 catch (RuntimeException rex) { 289 log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex); 290 } 291 } 292 throw er; 293 } 294 finally { 295 FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection"); 296 cron.stop(); 297 instrumentation.addCron(INSTRUMENTATION_GROUP, name, cron); 298 incrCommandCounter(1); 299 log.trace(logMask, "End"); 300 if (locks != null) { 301 for (LockToken lock : locks) { 302 lock.release(); 303 } 304 locks.clear(); 305 } 306 if (store != null) { 307 if (!store.isActive()) { 308 try { 309 store.closeTrx(); 310 } 311 catch (RuntimeException rex) { 312 if (exception) { 313 log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex); 314 } 315 else { 316 throw rex; 317 } 318 } 319 } 320 else { 321 log.warn(logMask | XLog.OPS, "transaction is not committed or rolled back before closing entitymanager."); 322 } 323 } 324 } 325 } 326 327 /** 328 * Queue a callable for execution after the current callable call invocation completes and the {@link WorkflowStore} 329 * transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are queued for a 330 * single serial execution. <p/> If the call invocation throws an exception all queued callables are discarded, they 331 * are not queued for execution. 332 * 333 * @param callable callable to queue for execution. 334 */ 335 protected void queueCallable(XCallable<Void> callable) { 336 callables.add(callable); 337 } 338 339 /** 340 * Queue a list of callables for execution after the current callable call invocation completes and the {@link 341 * WorkflowStore} transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are 342 * queued for a single serial execution. <p/> If the call invocation throws an exception all queued callables are 343 * discarded, they are not queued for execution. 344 * 345 * @param callables list of callables to queue for execution. 346 */ 347 protected void queueCallable(List<? extends XCallable<Void>> callables) { 348 this.callables.addAll(callables); 349 } 350 351 /** 352 * Queue a callable for delayed execution after the current callable call invocation completes and the {@link 353 * WorkflowStore} transaction commits. <p/> All queued delayed callables, regardless of the number of delay queue 354 * invocations, are queued for a single serial delayed execution with the highest delay of all queued callables. 355 * <p/> If the call invocation throws an exception all queued callables are discarded, they are not queued for 356 * execution. 357 * 358 * @param callable callable to queue for delayed execution. 359 * @param delay the queue delay in milliseconds 360 */ 361 protected void queueCallable(XCallable<Void> callable, long delay) { 362 this.delayedCallables.add(callable); 363 this.delay = Math.max(this.delay, delay); 364 } 365 366 /** 367 * Queue a callable for execution only in the event of an exception being thrown during the call invocation. <p/> If 368 * an exception does not happen, all the callables queued by this method are discarded, they are not queued for 369 * execution. <p/> All queued callables, regardless of the number of queue invocations, are queued for a single 370 * serial execution. 371 * 372 * @param callable callable to queue for execution in the case of an exception. 373 */ 374 protected void queueCallableForException(XCallable<Void> callable) { 375 exceptionCallables.add(callable); 376 } 377 378 /** 379 * Logging the info if failed to queue the callables. 380 * 381 * @param callables 382 */ 383 protected void logQueueCallableFalse(List<? extends XCallable<Void>> callables) { 384 StringBuilder sb = new StringBuilder( 385 "Unable to queue the callables, delayedQueue is full or system is in SAFEMODE - failed to queue:["); 386 int size = callables.size(); 387 for (int i = 0; i < size; i++) { 388 XCallable<Void> callable = callables.get(i); 389 sb.append(callable.getName()); 390 if (i < size - 1) { 391 sb.append(", "); 392 } 393 else { 394 sb.append("]"); 395 } 396 } 397 XLog.getLog(getClass()).warn(sb.toString()); 398 } 399 400 /** 401 * DagCallable subclasses must implement this method to perform their task. <p/> The workflow store works in 402 * transactional mode. The transaction is committed only if this method ends successfully. Otherwise the transaction 403 * is rolledback. 404 * 405 * @param store the workflow store instance for the callable, <code>null</code> if the callable does not use a 406 * store. 407 * @return the return value of the callable. 408 * @throws StoreException thrown if the workflow store could not perform an operation. 409 * @throws CommandException thrown if the command could not perform its operation. 410 */ 411 protected abstract T call(S store) throws StoreException, CommandException; 412 413 // to do 414 // need to implement on all sub commands and break down the transactions 415 416 // protected abstract T execute(String id) throws CommandException; 417 418 /** 419 * Command subclasses must implement this method correct Store can be passed to call(store); 420 * 421 * @return the Store class for use by Callable 422 * @throws CommandException thrown if the command could not perform its operation. 423 */ 424 protected abstract Class<? extends Store> getStoreClass(); 425 426 /** 427 * Set the log info with the context of the given coordinator bean. 428 * 429 * @param cBean coordinator bean. 430 */ 431 protected void setLogInfo(CoordinatorJobBean cBean) { 432 if (logInfo.getParameter(XLogService.GROUP) == null) { 433 logInfo.setParameter(XLogService.GROUP, cBean.getGroup()); 434 } 435 if (logInfo.getParameter(XLogService.USER) == null) { 436 logInfo.setParameter(XLogService.USER, cBean.getUser()); 437 } 438 logInfo.setParameter(DagXLogInfoService.JOB, cBean.getId()); 439 logInfo.setParameter(DagXLogInfoService.TOKEN, ""); 440 logInfo.setParameter(DagXLogInfoService.APP, cBean.getAppName()); 441 XLog.Info.get().setParameters(logInfo); 442 } 443 444 /** 445 * Set the log info with the context of the given coordinator action bean. 446 * 447 * @param action action bean. 448 */ 449 protected void setLogInfo(CoordinatorActionBean action) { 450 logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId()); 451 // logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken()); 452 logInfo.setParameter(DagXLogInfoService.ACTION, action.getId()); 453 XLog.Info.get().setParameters(logInfo); 454 } 455 456 /** 457 * Set the log info with the context of the given workflow bean. 458 * 459 * @param workflow workflow bean. 460 */ 461 protected void setLogInfo(WorkflowJobBean workflow) { 462 if (logInfo.getParameter(XLogService.GROUP) == null) { 463 logInfo.setParameter(XLogService.GROUP, workflow.getGroup()); 464 } 465 if (logInfo.getParameter(XLogService.USER) == null) { 466 logInfo.setParameter(XLogService.USER, workflow.getUser()); 467 } 468 logInfo.setParameter(DagXLogInfoService.JOB, workflow.getId()); 469 logInfo.setParameter(DagXLogInfoService.TOKEN, workflow.getLogToken()); 470 logInfo.setParameter(DagXLogInfoService.APP, workflow.getAppName()); 471 XLog.Info.get().setParameters(logInfo); 472 } 473 474 /** 475 * Set the log info with the context of the given action bean. 476 * 477 * @param action action bean. 478 */ 479 protected void setLogInfo(WorkflowActionBean action) { 480 logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId()); 481 logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken()); 482 logInfo.setParameter(DagXLogInfoService.ACTION, action.getId()); 483 XLog.Info.get().setParameters(logInfo); 484 } 485 486 /** 487 * Reset the action bean information from the log info. 488 */ 489 // TODO check if they are used, else delete 490 protected void resetLogInfoAction() { 491 logInfo.clearParameter(DagXLogInfoService.ACTION); 492 XLog.Info.get().clearParameter(DagXLogInfoService.ACTION); 493 } 494 495 /** 496 * Reset the workflow bean information from the log info. 497 */ 498 // TODO check if they are used, else delete 499 protected void resetLogInfoWorkflow() { 500 logInfo.clearParameter(DagXLogInfoService.JOB); 501 logInfo.clearParameter(DagXLogInfoService.APP); 502 logInfo.clearParameter(DagXLogInfoService.TOKEN); 503 XLog.Info.get().clearParameter(DagXLogInfoService.JOB); 504 XLog.Info.get().clearParameter(DagXLogInfoService.APP); 505 XLog.Info.get().clearParameter(DagXLogInfoService.TOKEN); 506 } 507 508 /** 509 * Convenience method to increment counters. 510 * 511 * @param group the group name. 512 * @param name the counter name. 513 * @param count increment count. 514 */ 515 private void incrCounter(String group, String name, int count) { 516 if (instrumentation != null) { 517 instrumentation.incr(group, name, count); 518 } 519 } 520 521 /** 522 * Used to increment command counters. 523 * 524 * @param count the increment count. 525 */ 526 protected void incrCommandCounter(int count) { 527 incrCounter(INSTRUMENTATION_GROUP, name, count); 528 } 529 530 /** 531 * Used to increment job counters. The counter name s the same as the command name. 532 * 533 * @param count the increment count. 534 */ 535 protected void incrJobCounter(int count) { 536 incrJobCounter(name, count); 537 } 538 539 /** 540 * Used to increment job counters. 541 * 542 * @param name the job name. 543 * @param count the increment count. 544 */ 545 protected void incrJobCounter(String name, int count) { 546 incrCounter(INSTRUMENTATION_JOB_GROUP, name, count); 547 } 548 549 /** 550 * Return the {@link Instrumentation} instance in use. 551 * 552 * @return the {@link Instrumentation} instance in use. 553 */ 554 protected Instrumentation getInstrumentation() { 555 return instrumentation; 556 } 557 558 /** 559 * Return the identity. 560 * 561 * @return the identity. 562 */ 563 @Override 564 public String toString() { 565 StringBuilder sb = new StringBuilder(); 566 sb.append(getType()); 567 sb.append(",").append(getPriority()); 568 return sb.toString(); 569 } 570 571 protected boolean lock(String id) throws InterruptedException { 572 if (id == null || id.length() == 0) { 573 XLog.getLog(getClass()).warn("lock(): Id is null or empty :" + id + ":"); 574 return false; 575 } 576 LockToken token = Services.get().get(MemoryLocksService.class).getWriteLock(id, LOCK_TIMEOUT); 577 if (token != null) { 578 locks.add(token); 579 return true; 580 } 581 else { 582 return false; 583 } 584 } 585 586 /* 587 * TODO - remove store coupling to EM. Store will only contain queries 588 * protected EntityManager getEntityManager() { return 589 * store.getEntityManager(); } 590 */ 591 protected T execute(S store) throws CommandException, StoreException { 592 T result = call(store); 593 return result; 594 } 595 596 /** 597 * Get command key 598 * 599 * @return command key 600 */ 601 @Override 602 public String getKey(){ 603 return this.key; 604 } 605 606 /** 607 * Get command lock key returning the key as an entity key, [not used] Just 608 * to be able to implement XCallable [to be deprecated] 609 * 610 * @return key 611 */ 612 @Override 613 public String getEntityKey() { 614 return this.key; 615 } 616 617 /** 618 * set the mode of execution for the callable. True if in interrupt, false 619 * if not [to be deprecated] 620 */ 621 public void setInterruptMode(boolean mode) { 622 } 623 624 /** 625 * [to be deprecated] 626 * 627 * @return the mode of execution. true if it is executed as an Interrupt, 628 * false otherwise 629 */ 630 public boolean inInterruptMode() { 631 return false; 632 } 633 634 }