001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command; 019 020import java.util.ArrayList; 021import java.util.List; 022import java.util.UUID; 023 024import org.apache.oozie.CoordinatorActionBean; 025import org.apache.oozie.CoordinatorJobBean; 026import org.apache.oozie.ErrorCode; 027import org.apache.oozie.FaultInjection; 028import org.apache.oozie.WorkflowActionBean; 029import org.apache.oozie.WorkflowJobBean; 030import org.apache.oozie.XException; 031import org.apache.oozie.service.CallableQueueService; 032import org.apache.oozie.service.DagXLogInfoService; 033import org.apache.oozie.service.InstrumentationService; 034import org.apache.oozie.service.MemoryLocksService; 035import org.apache.oozie.service.Services; 036import org.apache.oozie.service.StoreService; 037import org.apache.oozie.service.XLogService; 038import org.apache.oozie.store.Store; 039import org.apache.oozie.store.StoreException; 040import org.apache.oozie.store.WorkflowStore; 041import org.apache.oozie.util.InstrumentUtils; 042import org.apache.oozie.util.Instrumentation; 043import org.apache.oozie.util.ParamChecker; 044import org.apache.oozie.util.XCallable; 045import org.apache.oozie.util.XLog; 046import org.apache.oozie.lock.LockToken; 047 048/** 049 * Base class for all synchronous and asynchronous DagEngine commands. 050 */ 051public abstract class Command<T, S extends Store> implements XCallable<T> { 052 /** 053 * The instrumentation group used for Commands. 054 */ 055 private static final String INSTRUMENTATION_GROUP = "commands"; 056 057 private final long createdTime; 058 059 /** 060 * The instrumentation group used for Jobs. 061 */ 062 private static final String INSTRUMENTATION_JOB_GROUP = "jobs"; 063 064 private static final long LOCK_TIMEOUT = 1000; 065 protected static final long LOCK_FAILURE_REQUEUE_INTERVAL = 30000; 066 067 protected Instrumentation instrumentation; 068 private List<XCallable<Void>> callables; 069 private List<XCallable<Void>> delayedCallables; 070 private long delay = 0; 071 private List<XCallable<Void>> exceptionCallables; 072 private String name; 073 private String type; 074 private String key; 075 private int priority; 076 private int logMask; 077 private boolean withStore; 078 protected boolean dryrun = false; 079 private ArrayList<LockToken> locks = null; 080 081 /** 082 * This variable is package private for testing purposes only. 083 */ 084 XLog.Info logInfo; 085 086 /** 087 * Create a command that uses a {@link WorkflowStore} instance. <p/> The current {@link XLog.Info} values are 088 * captured for execution. 089 * 090 * @param name command name. 091 * @param type command type. 092 * @param priority priority of the command, used when queuing for asynchronous execution. 093 * @param logMask log mask for the command logging calls. 094 */ 095 public Command(String name, String type, int priority, int logMask) { 096 this(name, type, priority, logMask, true); 097 } 098 099 /** 100 * Create a command. <p/> The current {@link XLog.Info} values are captured for execution. 101 * 102 * @param name command name. 103 * @param type command type. 104 * @param priority priority of the command, used when queuing for asynchronous execution. 105 * @param logMask log mask for the command logging calls. 106 * @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not. 107 */ 108 public Command(String name, String type, int priority, int logMask, boolean withStore) { 109 this.name = ParamChecker.notEmpty(name, "name"); 110 this.type = ParamChecker.notEmpty(type, "type"); 111 this.key = name + "_" + UUID.randomUUID(); 112 this.priority = priority; 113 this.withStore = withStore; 114 this.logMask = logMask; 115 instrumentation = Services.get().get(InstrumentationService.class).get(); 116 logInfo = new XLog.Info(XLog.Info.get()); 117 createdTime = System.currentTimeMillis(); 118 locks = new ArrayList<LockToken>(); 119 } 120 121 /** 122 * Create a command. <p/> The current {@link XLog.Info} values are captured for execution. 123 * 124 * @param name command name. 125 * @param type command type. 126 * @param priority priority of the command, used when queuing for asynchronous execution. 127 * @param logMask log mask for the command logging calls. 128 * @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not. 129 * @param dryrun indicates if dryrun option is enabled. if enabled coordinator will show a diagnostic output without 130 * really submitting the job 131 */ 132 public Command(String name, String type, int priority, int logMask, boolean withStore, boolean dryrun) { 133 this(name, type, priority, logMask, withStore); 134 this.dryrun = dryrun; 135 } 136 137 /** 138 * Return the name of the command. 139 * 140 * @return the name of the command. 141 */ 142 @Override 143 public String getName() { 144 return name; 145 } 146 147 /** 148 * Return the callable type. <p/> The callable type is used for concurrency throttling in the {@link 149 * org.apache.oozie.service.CallableQueueService}. 150 * 151 * @return the callable type. 152 */ 153 @Override 154 public String getType() { 155 return type; 156 } 157 158 /** 159 * Return the priority of the command. 160 * 161 * @return the priority of the command. 162 */ 163 @Override 164 public int getPriority() { 165 return priority; 166 } 167 168 /** 169 * Returns the createdTime of the callable in milliseconds 170 * 171 * @return the callable createdTime 172 */ 173 @Override 174 public long getCreatedTime() { 175 return createdTime; 176 } 177 178 /** 179 * Execute the command {@link #call(WorkflowStore)} setting all the necessary context. <p/> The {@link XLog.Info} is 180 * set to the values at instance creation time. <p/> The command execution is logged and instrumented. <p/> If a 181 * {@link WorkflowStore} is used, a fresh instance will be passed and it will be commited after the {@link 182 * #call(WorkflowStore)} execution. It will be closed without committing if an exception is thrown. <p/> Commands 183 * queued via the DagCommand queue methods are queued for execution after the workflow store has been committed. 184 * <p/> If an exception happends the queued commands will not be effectively queued for execution. Instead, the the 185 * commands queued for exception will be effectively queued fro execution.. 186 * 187 * @throws CommandException thrown if the command could not be executed successfully, the workflow store is closed 188 * without committing, thus doing a rollback. 189 */ 190 @SuppressWarnings({"ThrowFromFinallyBlock", "unchecked"}) 191 public final T call() throws CommandException { 192 XLog.Info.get().setParameters(logInfo); 193 XLog log = XLog.getLog(getClass()); 194 log.trace(logMask, "Start"); 195 Instrumentation.Cron cron = new Instrumentation.Cron(); 196 cron.start(); 197 callables = new ArrayList<XCallable<Void>>(); 198 delayedCallables = new ArrayList<XCallable<Void>>(); 199 exceptionCallables = new ArrayList<XCallable<Void>>(); 200 delay = 0; 201 S store = null; 202 boolean exception = false; 203 204 try { 205 if (withStore) { 206 store = (S) Services.get().get(StoreService.class).getStore(getStoreClass()); 207 store.beginTrx(); 208 } 209 T result = execute(store); 210 /* 211 * 212 * if (store != null && log != null) { log.info(XLog.STD, 213 * "connection log from store Flush Mode {0} ", 214 * store.getFlushMode()); } 215 */ 216 if (withStore) { 217 if (store == null) { 218 throw new IllegalStateException("WorkflowStore should not be null"); 219 } 220 if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) { 221 throw new RuntimeException("Skipping Commit for Failover Testing"); 222 } 223 store.commitTrx(); 224 } 225 226 // TODO figure out the reject due to concurrency problems and remove 227 // the delayed queuing for callables. 228 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables, 10); 229 if (ret == false) { 230 logQueueCallableFalse(callables); 231 } 232 233 ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, delay); 234 if (ret == false) { 235 logQueueCallableFalse(delayedCallables); 236 } 237 238 return result; 239 } 240 catch (XException ex) { 241 log.error(logMask | XLog.OPS, "XException, {0}", ex.getMessage(), ex); 242 if (store != null) { 243 log.info(XLog.STD, "XException - connection logs from store {0}, {1}", store.getConnection(), store 244 .isClosed()); 245 } 246 exception = true; 247 if (store != null && store.isActive()) { 248 try { 249 store.rollbackTrx(); 250 } 251 catch (RuntimeException rex) { 252 log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex); 253 } 254 } 255 256 // TODO figure out the reject due to concurrency problems and remove 257 // the delayed queuing for callables. 258 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(exceptionCallables, 10); 259 if (ret == false) { 260 logQueueCallableFalse(exceptionCallables); 261 } 262 if (ex instanceof CommandException) { 263 throw (CommandException) ex; 264 } 265 else { 266 throw new CommandException(ex); 267 } 268 } 269 catch (Exception ex) { 270 log.error(logMask | XLog.OPS, "Exception, {0}", ex.getMessage(), ex); 271 exception = true; 272 if (store != null && store.isActive()) { 273 try { 274 store.rollbackTrx(); 275 } 276 catch (RuntimeException rex) { 277 log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex); 278 } 279 } 280 throw new CommandException(ErrorCode.E0607, name, ex.getMessage(), ex); 281 } 282 catch (Error er) { 283 log.error(logMask | XLog.OPS, "Error, {0}", er.getMessage(), er); 284 exception = true; 285 if (store != null && store.isActive()) { 286 try { 287 store.rollbackTrx(); 288 } 289 catch (RuntimeException rex) { 290 log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex); 291 } 292 } 293 throw er; 294 } 295 finally { 296 FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection"); 297 cron.stop(); 298 instrumentation.addCron(INSTRUMENTATION_GROUP, name, cron); 299 InstrumentUtils.incrCommandCounter(name, 1, instrumentation); 300 log.trace(logMask, "End"); 301 if (locks != null) { 302 for (LockToken lock : locks) { 303 lock.release(); 304 } 305 locks.clear(); 306 } 307 if (store != null) { 308 if (!store.isActive()) { 309 try { 310 store.closeTrx(); 311 } 312 catch (RuntimeException rex) { 313 if (exception) { 314 log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex); 315 } 316 else { 317 throw rex; 318 } 319 } 320 } 321 else { 322 log.warn(logMask | XLog.OPS, "transaction is not committed or rolled back before closing entitymanager."); 323 } 324 } 325 } 326 } 327 328 /** 329 * Queue a callable for execution after the current callable call invocation completes and the {@link WorkflowStore} 330 * transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are queued for a 331 * single serial execution. <p/> If the call invocation throws an exception all queued callables are discarded, they 332 * are not queued for execution. 333 * 334 * @param callable callable to queue for execution. 335 */ 336 protected void queueCallable(XCallable<Void> callable) { 337 callables.add(callable); 338 } 339 340 /** 341 * Queue a list of callables for execution after the current callable call invocation completes and the {@link 342 * WorkflowStore} transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are 343 * queued for a single serial execution. <p/> If the call invocation throws an exception all queued callables are 344 * discarded, they are not queued for execution. 345 * 346 * @param callables list of callables to queue for execution. 347 */ 348 protected void queueCallable(List<? extends XCallable<Void>> callables) { 349 this.callables.addAll(callables); 350 } 351 352 /** 353 * Queue a callable for delayed execution after the current callable call invocation completes and the {@link 354 * WorkflowStore} transaction commits. <p/> All queued delayed callables, regardless of the number of delay queue 355 * invocations, are queued for a single serial delayed execution with the highest delay of all queued callables. 356 * <p/> If the call invocation throws an exception all queued callables are discarded, they are not queued for 357 * execution. 358 * 359 * @param callable callable to queue for delayed execution. 360 * @param delay the queue delay in milliseconds 361 */ 362 protected void queueCallable(XCallable<Void> callable, long delay) { 363 this.delayedCallables.add(callable); 364 this.delay = Math.max(this.delay, delay); 365 } 366 367 /** 368 * Queue a callable for execution only in the event of an exception being thrown during the call invocation. <p/> If 369 * an exception does not happen, all the callables queued by this method are discarded, they are not queued for 370 * execution. <p/> All queued callables, regardless of the number of queue invocations, are queued for a single 371 * serial execution. 372 * 373 * @param callable callable to queue for execution in the case of an exception. 374 */ 375 protected void queueCallableForException(XCallable<Void> callable) { 376 exceptionCallables.add(callable); 377 } 378 379 /** 380 * Logging the info if failed to queue the callables. 381 * 382 * @param callables 383 */ 384 protected void logQueueCallableFalse(List<? extends XCallable<Void>> callables) { 385 StringBuilder sb = new StringBuilder( 386 "Unable to queue the callables, delayedQueue is full or system is in SAFEMODE - failed to queue:["); 387 int size = callables.size(); 388 for (int i = 0; i < size; i++) { 389 XCallable<Void> callable = callables.get(i); 390 sb.append(callable.getName()); 391 if (i < size - 1) { 392 sb.append(", "); 393 } 394 else { 395 sb.append("]"); 396 } 397 } 398 XLog.getLog(getClass()).warn(sb.toString()); 399 } 400 401 /** 402 * DagCallable subclasses must implement this method to perform their task. <p/> The workflow store works in 403 * transactional mode. The transaction is committed only if this method ends successfully. Otherwise the transaction 404 * is rolledback. 405 * 406 * @param store the workflow store instance for the callable, <code>null</code> if the callable does not use a 407 * store. 408 * @return the return value of the callable. 409 * @throws StoreException thrown if the workflow store could not perform an operation. 410 * @throws CommandException thrown if the command could not perform its operation. 411 */ 412 protected abstract T call(S store) throws StoreException, CommandException; 413 414 // to do 415 // need to implement on all sub commands and break down the transactions 416 417 // protected abstract T execute(String id) throws CommandException; 418 419 /** 420 * Command subclasses must implement this method correct Store can be passed to call(store); 421 * 422 * @return the Store class for use by Callable 423 * @throws CommandException thrown if the command could not perform its operation. 424 */ 425 protected abstract Class<? extends Store> getStoreClass(); 426 427 /** 428 * Set the log info with the context of the given coordinator bean. 429 * 430 * @param cBean coordinator bean. 431 */ 432 protected void setLogInfo(CoordinatorJobBean cBean) { 433 if (logInfo.getParameter(XLogService.GROUP) == null) { 434 logInfo.setParameter(XLogService.GROUP, cBean.getGroup()); 435 } 436 if (logInfo.getParameter(XLogService.USER) == null) { 437 logInfo.setParameter(XLogService.USER, cBean.getUser()); 438 } 439 logInfo.setParameter(DagXLogInfoService.JOB, cBean.getId()); 440 logInfo.setParameter(DagXLogInfoService.TOKEN, ""); 441 logInfo.setParameter(DagXLogInfoService.APP, cBean.getAppName()); 442 XLog.Info.get().setParameters(logInfo); 443 } 444 445 /** 446 * Set the log info with the context of the given coordinator action bean. 447 * 448 * @param action action bean. 449 */ 450 protected void setLogInfo(CoordinatorActionBean action) { 451 logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId()); 452 // logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken()); 453 logInfo.setParameter(DagXLogInfoService.ACTION, action.getId()); 454 XLog.Info.get().setParameters(logInfo); 455 } 456 457 /** 458 * Set the log info with the context of the given workflow bean. 459 * 460 * @param workflow workflow bean. 461 */ 462 protected void setLogInfo(WorkflowJobBean workflow) { 463 if (logInfo.getParameter(XLogService.GROUP) == null) { 464 logInfo.setParameter(XLogService.GROUP, workflow.getGroup()); 465 } 466 if (logInfo.getParameter(XLogService.USER) == null) { 467 logInfo.setParameter(XLogService.USER, workflow.getUser()); 468 } 469 logInfo.setParameter(DagXLogInfoService.JOB, workflow.getId()); 470 logInfo.setParameter(DagXLogInfoService.TOKEN, workflow.getLogToken()); 471 logInfo.setParameter(DagXLogInfoService.APP, workflow.getAppName()); 472 XLog.Info.get().setParameters(logInfo); 473 } 474 475 /** 476 * Set the log info with the context of the given action bean. 477 * 478 * @param action action bean. 479 */ 480 protected void setLogInfo(WorkflowActionBean action) { 481 logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId()); 482 logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken()); 483 logInfo.setParameter(DagXLogInfoService.ACTION, action.getId()); 484 XLog.Info.get().setParameters(logInfo); 485 } 486 487 /** 488 * Reset the action bean information from the log info. 489 */ 490 // TODO check if they are used, else delete 491 protected void resetLogInfoAction() { 492 logInfo.clearParameter(DagXLogInfoService.ACTION); 493 XLog.Info.get().clearParameter(DagXLogInfoService.ACTION); 494 } 495 496 /** 497 * Reset the workflow bean information from the log info. 498 */ 499 // TODO check if they are used, else delete 500 protected void resetLogInfoWorkflow() { 501 logInfo.clearParameter(DagXLogInfoService.JOB); 502 logInfo.clearParameter(DagXLogInfoService.APP); 503 logInfo.clearParameter(DagXLogInfoService.TOKEN); 504 XLog.Info.get().clearParameter(DagXLogInfoService.JOB); 505 XLog.Info.get().clearParameter(DagXLogInfoService.APP); 506 XLog.Info.get().clearParameter(DagXLogInfoService.TOKEN); 507 } 508 509 /** 510 * Return the {@link Instrumentation} instance in use. 511 * 512 * @return the {@link Instrumentation} instance in use. 513 */ 514 protected Instrumentation getInstrumentation() { 515 return instrumentation; 516 } 517 518 /** 519 * Return the identity. 520 * 521 * @return the identity. 522 */ 523 @Override 524 public String toString() { 525 StringBuilder sb = new StringBuilder(); 526 sb.append(getType()); 527 sb.append(",").append(getPriority()); 528 return sb.toString(); 529 } 530 531 protected boolean lock(String id) throws InterruptedException { 532 if (id == null || id.length() == 0) { 533 XLog.getLog(getClass()).warn("lock(): Id is null or empty :" + id + ":"); 534 return false; 535 } 536 LockToken token = Services.get().get(MemoryLocksService.class).getWriteLock(id, LOCK_TIMEOUT); 537 if (token != null) { 538 locks.add(token); 539 return true; 540 } 541 else { 542 return false; 543 } 544 } 545 546 /* 547 * TODO - remove store coupling to EM. Store will only contain queries 548 * protected EntityManager getEntityManager() { return 549 * store.getEntityManager(); } 550 */ 551 protected T execute(S store) throws CommandException, StoreException { 552 T result = call(store); 553 return result; 554 } 555 556 /** 557 * Get command key 558 * 559 * @return command key 560 */ 561 @Override 562 public String getKey(){ 563 return this.key; 564 } 565 566 /** 567 * Get command lock key returning the key as an entity key, [not used] Just 568 * to be able to implement XCallable [to be deprecated] 569 * 570 * @return key 571 */ 572 @Override 573 public String getEntityKey() { 574 return this.key; 575 } 576 577 /** 578 * set the mode of execution for the callable. True if in interrupt, false 579 * if not [to be deprecated] 580 */ 581 public void setInterruptMode(boolean mode) { 582 } 583 584 /** 585 * [to be deprecated] 586 * 587 * @return the mode of execution. true if it is executed as an Interrupt, 588 * false otherwise 589 */ 590 public boolean inInterruptMode() { 591 return false; 592 } 593 594}