This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command;
019
020 import java.util.ArrayList;
021 import java.util.List;
022 import java.util.UUID;
023
024 import org.apache.oozie.CoordinatorActionBean;
025 import org.apache.oozie.CoordinatorJobBean;
026 import org.apache.oozie.ErrorCode;
027 import org.apache.oozie.FaultInjection;
028 import org.apache.oozie.WorkflowActionBean;
029 import org.apache.oozie.WorkflowJobBean;
030 import org.apache.oozie.XException;
031 import org.apache.oozie.service.CallableQueueService;
032 import org.apache.oozie.service.DagXLogInfoService;
033 import org.apache.oozie.service.InstrumentationService;
034 import org.apache.oozie.service.MemoryLocksService;
035 import org.apache.oozie.service.Services;
036 import org.apache.oozie.service.StoreService;
037 import org.apache.oozie.service.XLogService;
038 import org.apache.oozie.store.Store;
039 import org.apache.oozie.store.StoreException;
040 import org.apache.oozie.store.WorkflowStore;
041 import org.apache.oozie.util.Instrumentation;
042 import org.apache.oozie.util.ParamChecker;
043 import org.apache.oozie.util.XCallable;
044 import org.apache.oozie.util.XLog;
045 import org.apache.oozie.util.MemoryLocks.LockToken;
046
047 /**
048 * Base class for all synchronous and asynchronous DagEngine commands.
049 */
050 public abstract class Command<T, S extends Store> implements XCallable<T> {
051 /**
052 * The instrumentation group used for Commands.
053 */
054 private static final String INSTRUMENTATION_GROUP = "commands";
055
056 private final long createdTime;
057
058 /**
059 * The instrumentation group used for Jobs.
060 */
061 private static final String INSTRUMENTATION_JOB_GROUP = "jobs";
062
063 private static final long LOCK_TIMEOUT = 1000;
064 protected static final long LOCK_FAILURE_REQUEUE_INTERVAL = 30000;
065
066 protected Instrumentation instrumentation;
067 private List<XCallable<Void>> callables;
068 private List<XCallable<Void>> delayedCallables;
069 private long delay = 0;
070 private List<XCallable<Void>> exceptionCallables;
071 private String name;
072 private String type;
073 private String key;
074 private int priority;
075 private int logMask;
076 private boolean withStore;
077 protected boolean dryrun = false;
078 private ArrayList<LockToken> locks = null;
079
080 /**
081 * This variable is package private for testing purposes only.
082 */
083 XLog.Info logInfo;
084
085 /**
086 * Create a command that uses a {@link WorkflowStore} instance. <p/> The current {@link XLog.Info} values are
087 * captured for execution.
088 *
089 * @param name command name.
090 * @param type command type.
091 * @param priority priority of the command, used when queuing for asynchronous execution.
092 * @param logMask log mask for the command logging calls.
093 */
094 public Command(String name, String type, int priority, int logMask) {
095 this(name, type, priority, logMask, true);
096 }
097
098 /**
099 * Create a command. <p/> The current {@link XLog.Info} values are captured for execution.
100 *
101 * @param name command name.
102 * @param type command type.
103 * @param priority priority of the command, used when queuing for asynchronous execution.
104 * @param logMask log mask for the command logging calls.
105 * @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not.
106 */
107 public Command(String name, String type, int priority, int logMask, boolean withStore) {
108 this.name = ParamChecker.notEmpty(name, "name");
109 this.type = ParamChecker.notEmpty(type, "type");
110 this.key = name + "_" + UUID.randomUUID();
111 this.priority = priority;
112 this.withStore = withStore;
113 this.logMask = logMask;
114 instrumentation = Services.get().get(InstrumentationService.class).get();
115 logInfo = new XLog.Info(XLog.Info.get());
116 createdTime = System.currentTimeMillis();
117 locks = new ArrayList<LockToken>();
118 }
119
120 /**
121 * Create a command. <p/> The current {@link XLog.Info} values are captured for execution.
122 *
123 * @param name command name.
124 * @param type command type.
125 * @param priority priority of the command, used when queuing for asynchronous execution.
126 * @param logMask log mask for the command logging calls.
127 * @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not.
128 * @param dryrun indicates if dryrun option is enabled. if enabled coordinator will show a diagnostic output without
129 * really submitting the job
130 */
131 public Command(String name, String type, int priority, int logMask, boolean withStore, boolean dryrun) {
132 this(name, type, priority, logMask, withStore);
133 this.dryrun = dryrun;
134 }
135
136 /**
137 * Return the name of the command.
138 *
139 * @return the name of the command.
140 */
141 @Override
142 public String getName() {
143 return name;
144 }
145
146 /**
147 * Return the callable type. <p/> The callable type is used for concurrency throttling in the {@link
148 * org.apache.oozie.service.CallableQueueService}.
149 *
150 * @return the callable type.
151 */
152 @Override
153 public String getType() {
154 return type;
155 }
156
157 /**
158 * Return the priority of the command.
159 *
160 * @return the priority of the command.
161 */
162 @Override
163 public int getPriority() {
164 return priority;
165 }
166
167 /**
168 * Returns the createdTime of the callable in milliseconds
169 *
170 * @return the callable createdTime
171 */
172 @Override
173 public long getCreatedTime() {
174 return createdTime;
175 }
176
177 /**
178 * Execute the command {@link #call(WorkflowStore)} setting all the necessary context. <p/> The {@link XLog.Info} is
179 * set to the values at instance creation time. <p/> The command execution is logged and instrumented. <p/> If a
180 * {@link WorkflowStore} is used, a fresh instance will be passed and it will be commited after the {@link
181 * #call(WorkflowStore)} execution. It will be closed without committing if an exception is thrown. <p/> Commands
182 * queued via the DagCommand queue methods are queued for execution after the workflow store has been committed.
183 * <p/> If an exception happends the queued commands will not be effectively queued for execution. Instead, the the
184 * commands queued for exception will be effectively queued fro execution..
185 *
186 * @throws CommandException thrown if the command could not be executed successfully, the workflow store is closed
187 * without committing, thus doing a rollback.
188 */
189 @SuppressWarnings({"ThrowFromFinallyBlock", "unchecked"})
190 public final T call() throws CommandException {
191 XLog.Info.get().setParameters(logInfo);
192 XLog log = XLog.getLog(getClass());
193 log.trace(logMask, "Start");
194 Instrumentation.Cron cron = new Instrumentation.Cron();
195 cron.start();
196 callables = new ArrayList<XCallable<Void>>();
197 delayedCallables = new ArrayList<XCallable<Void>>();
198 exceptionCallables = new ArrayList<XCallable<Void>>();
199 delay = 0;
200 S store = null;
201 boolean exception = false;
202
203 try {
204 if (withStore) {
205 store = (S) Services.get().get(StoreService.class).getStore(getStoreClass());
206 store.beginTrx();
207 }
208 T result = execute(store);
209 /*
210 *
211 * if (store != null && log != null) { log.info(XLog.STD,
212 * "connection log from store Flush Mode {0} ",
213 * store.getFlushMode()); }
214 */
215 if (withStore) {
216 if (store == null) {
217 throw new IllegalStateException("WorkflowStore should not be null");
218 }
219 if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
220 throw new RuntimeException("Skipping Commit for Failover Testing");
221 }
222 store.commitTrx();
223 }
224
225 // TODO figure out the reject due to concurrency problems and remove
226 // the delayed queuing for callables.
227 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables, 10);
228 if (ret == false) {
229 logQueueCallableFalse(callables);
230 }
231
232 ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, delay);
233 if (ret == false) {
234 logQueueCallableFalse(delayedCallables);
235 }
236
237 return result;
238 }
239 catch (XException ex) {
240 log.error(logMask | XLog.OPS, "XException, {0}", ex.getMessage(), ex);
241 if (store != null) {
242 log.info(XLog.STD, "XException - connection logs from store {0}, {1}", store.getConnection(), store
243 .isClosed());
244 }
245 exception = true;
246 if (store != null && store.isActive()) {
247 try {
248 store.rollbackTrx();
249 }
250 catch (RuntimeException rex) {
251 log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
252 }
253 }
254
255 // TODO figure out the reject due to concurrency problems and remove
256 // the delayed queuing for callables.
257 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(exceptionCallables, 10);
258 if (ret == false) {
259 logQueueCallableFalse(exceptionCallables);
260 }
261 if (ex instanceof CommandException) {
262 throw (CommandException) ex;
263 }
264 else {
265 throw new CommandException(ex);
266 }
267 }
268 catch (Exception ex) {
269 log.error(logMask | XLog.OPS, "Exception, {0}", ex.getMessage(), ex);
270 exception = true;
271 if (store != null && store.isActive()) {
272 try {
273 store.rollbackTrx();
274 }
275 catch (RuntimeException rex) {
276 log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
277 }
278 }
279 throw new CommandException(ErrorCode.E0607, name, ex.getMessage(), ex);
280 }
281 catch (Error er) {
282 log.error(logMask | XLog.OPS, "Error, {0}", er.getMessage(), er);
283 exception = true;
284 if (store != null && store.isActive()) {
285 try {
286 store.rollbackTrx();
287 }
288 catch (RuntimeException rex) {
289 log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
290 }
291 }
292 throw er;
293 }
294 finally {
295 FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
296 cron.stop();
297 instrumentation.addCron(INSTRUMENTATION_GROUP, name, cron);
298 incrCommandCounter(1);
299 log.trace(logMask, "End");
300 if (locks != null) {
301 for (LockToken lock : locks) {
302 lock.release();
303 }
304 locks.clear();
305 }
306 if (store != null) {
307 if (!store.isActive()) {
308 try {
309 store.closeTrx();
310 }
311 catch (RuntimeException rex) {
312 if (exception) {
313 log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
314 }
315 else {
316 throw rex;
317 }
318 }
319 }
320 else {
321 log.warn(logMask | XLog.OPS, "transaction is not committed or rolled back before closing entitymanager.");
322 }
323 }
324 }
325 }
326
327 /**
328 * Queue a callable for execution after the current callable call invocation completes and the {@link WorkflowStore}
329 * transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are queued for a
330 * single serial execution. <p/> If the call invocation throws an exception all queued callables are discarded, they
331 * are not queued for execution.
332 *
333 * @param callable callable to queue for execution.
334 */
335 protected void queueCallable(XCallable<Void> callable) {
336 callables.add(callable);
337 }
338
339 /**
340 * Queue a list of callables for execution after the current callable call invocation completes and the {@link
341 * WorkflowStore} transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are
342 * queued for a single serial execution. <p/> If the call invocation throws an exception all queued callables are
343 * discarded, they are not queued for execution.
344 *
345 * @param callables list of callables to queue for execution.
346 */
347 protected void queueCallable(List<? extends XCallable<Void>> callables) {
348 this.callables.addAll(callables);
349 }
350
351 /**
352 * Queue a callable for delayed execution after the current callable call invocation completes and the {@link
353 * WorkflowStore} transaction commits. <p/> All queued delayed callables, regardless of the number of delay queue
354 * invocations, are queued for a single serial delayed execution with the highest delay of all queued callables.
355 * <p/> If the call invocation throws an exception all queued callables are discarded, they are not queued for
356 * execution.
357 *
358 * @param callable callable to queue for delayed execution.
359 * @param delay the queue delay in milliseconds
360 */
361 protected void queueCallable(XCallable<Void> callable, long delay) {
362 this.delayedCallables.add(callable);
363 this.delay = Math.max(this.delay, delay);
364 }
365
366 /**
367 * Queue a callable for execution only in the event of an exception being thrown during the call invocation. <p/> If
368 * an exception does not happen, all the callables queued by this method are discarded, they are not queued for
369 * execution. <p/> All queued callables, regardless of the number of queue invocations, are queued for a single
370 * serial execution.
371 *
372 * @param callable callable to queue for execution in the case of an exception.
373 */
374 protected void queueCallableForException(XCallable<Void> callable) {
375 exceptionCallables.add(callable);
376 }
377
378 /**
379 * Logging the info if failed to queue the callables.
380 *
381 * @param callables
382 */
383 protected void logQueueCallableFalse(List<? extends XCallable<Void>> callables) {
384 StringBuilder sb = new StringBuilder(
385 "Unable to queue the callables, delayedQueue is full or system is in SAFEMODE - failed to queue:[");
386 int size = callables.size();
387 for (int i = 0; i < size; i++) {
388 XCallable<Void> callable = callables.get(i);
389 sb.append(callable.getName());
390 if (i < size - 1) {
391 sb.append(", ");
392 }
393 else {
394 sb.append("]");
395 }
396 }
397 XLog.getLog(getClass()).warn(sb.toString());
398 }
399
400 /**
401 * DagCallable subclasses must implement this method to perform their task. <p/> The workflow store works in
402 * transactional mode. The transaction is committed only if this method ends successfully. Otherwise the transaction
403 * is rolledback.
404 *
405 * @param store the workflow store instance for the callable, <code>null</code> if the callable does not use a
406 * store.
407 * @return the return value of the callable.
408 * @throws StoreException thrown if the workflow store could not perform an operation.
409 * @throws CommandException thrown if the command could not perform its operation.
410 */
411 protected abstract T call(S store) throws StoreException, CommandException;
412
413 // to do
414 // need to implement on all sub commands and break down the transactions
415
416 // protected abstract T execute(String id) throws CommandException;
417
418 /**
419 * Command subclasses must implement this method correct Store can be passed to call(store);
420 *
421 * @return the Store class for use by Callable
422 * @throws CommandException thrown if the command could not perform its operation.
423 */
424 protected abstract Class<? extends Store> getStoreClass();
425
426 /**
427 * Set the log info with the context of the given coordinator bean.
428 *
429 * @param cBean coordinator bean.
430 */
431 protected void setLogInfo(CoordinatorJobBean cBean) {
432 if (logInfo.getParameter(XLogService.GROUP) == null) {
433 logInfo.setParameter(XLogService.GROUP, cBean.getGroup());
434 }
435 if (logInfo.getParameter(XLogService.USER) == null) {
436 logInfo.setParameter(XLogService.USER, cBean.getUser());
437 }
438 logInfo.setParameter(DagXLogInfoService.JOB, cBean.getId());
439 logInfo.setParameter(DagXLogInfoService.TOKEN, "");
440 logInfo.setParameter(DagXLogInfoService.APP, cBean.getAppName());
441 XLog.Info.get().setParameters(logInfo);
442 }
443
444 /**
445 * Set the log info with the context of the given coordinator action bean.
446 *
447 * @param action action bean.
448 */
449 protected void setLogInfo(CoordinatorActionBean action) {
450 logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId());
451 // logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken());
452 logInfo.setParameter(DagXLogInfoService.ACTION, action.getId());
453 XLog.Info.get().setParameters(logInfo);
454 }
455
456 /**
457 * Set the log info with the context of the given workflow bean.
458 *
459 * @param workflow workflow bean.
460 */
461 protected void setLogInfo(WorkflowJobBean workflow) {
462 if (logInfo.getParameter(XLogService.GROUP) == null) {
463 logInfo.setParameter(XLogService.GROUP, workflow.getGroup());
464 }
465 if (logInfo.getParameter(XLogService.USER) == null) {
466 logInfo.setParameter(XLogService.USER, workflow.getUser());
467 }
468 logInfo.setParameter(DagXLogInfoService.JOB, workflow.getId());
469 logInfo.setParameter(DagXLogInfoService.TOKEN, workflow.getLogToken());
470 logInfo.setParameter(DagXLogInfoService.APP, workflow.getAppName());
471 XLog.Info.get().setParameters(logInfo);
472 }
473
474 /**
475 * Set the log info with the context of the given action bean.
476 *
477 * @param action action bean.
478 */
479 protected void setLogInfo(WorkflowActionBean action) {
480 logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId());
481 logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken());
482 logInfo.setParameter(DagXLogInfoService.ACTION, action.getId());
483 XLog.Info.get().setParameters(logInfo);
484 }
485
486 /**
487 * Reset the action bean information from the log info.
488 */
489 // TODO check if they are used, else delete
490 protected void resetLogInfoAction() {
491 logInfo.clearParameter(DagXLogInfoService.ACTION);
492 XLog.Info.get().clearParameter(DagXLogInfoService.ACTION);
493 }
494
495 /**
496 * Reset the workflow bean information from the log info.
497 */
498 // TODO check if they are used, else delete
499 protected void resetLogInfoWorkflow() {
500 logInfo.clearParameter(DagXLogInfoService.JOB);
501 logInfo.clearParameter(DagXLogInfoService.APP);
502 logInfo.clearParameter(DagXLogInfoService.TOKEN);
503 XLog.Info.get().clearParameter(DagXLogInfoService.JOB);
504 XLog.Info.get().clearParameter(DagXLogInfoService.APP);
505 XLog.Info.get().clearParameter(DagXLogInfoService.TOKEN);
506 }
507
508 /**
509 * Convenience method to increment counters.
510 *
511 * @param group the group name.
512 * @param name the counter name.
513 * @param count increment count.
514 */
515 private void incrCounter(String group, String name, int count) {
516 if (instrumentation != null) {
517 instrumentation.incr(group, name, count);
518 }
519 }
520
521 /**
522 * Used to increment command counters.
523 *
524 * @param count the increment count.
525 */
526 protected void incrCommandCounter(int count) {
527 incrCounter(INSTRUMENTATION_GROUP, name, count);
528 }
529
530 /**
531 * Used to increment job counters. The counter name s the same as the command name.
532 *
533 * @param count the increment count.
534 */
535 protected void incrJobCounter(int count) {
536 incrJobCounter(name, count);
537 }
538
539 /**
540 * Used to increment job counters.
541 *
542 * @param name the job name.
543 * @param count the increment count.
544 */
545 protected void incrJobCounter(String name, int count) {
546 incrCounter(INSTRUMENTATION_JOB_GROUP, name, count);
547 }
548
549 /**
550 * Return the {@link Instrumentation} instance in use.
551 *
552 * @return the {@link Instrumentation} instance in use.
553 */
554 protected Instrumentation getInstrumentation() {
555 return instrumentation;
556 }
557
558 /**
559 * Return the identity.
560 *
561 * @return the identity.
562 */
563 @Override
564 public String toString() {
565 StringBuilder sb = new StringBuilder();
566 sb.append(getType());
567 sb.append(",").append(getPriority());
568 return sb.toString();
569 }
570
571 protected boolean lock(String id) throws InterruptedException {
572 if (id == null || id.length() == 0) {
573 XLog.getLog(getClass()).warn("lock(): Id is null or empty :" + id + ":");
574 return false;
575 }
576 LockToken token = Services.get().get(MemoryLocksService.class).getWriteLock(id, LOCK_TIMEOUT);
577 if (token != null) {
578 locks.add(token);
579 return true;
580 }
581 else {
582 return false;
583 }
584 }
585
586 /*
587 * TODO - remove store coupling to EM. Store will only contain queries
588 * protected EntityManager getEntityManager() { return
589 * store.getEntityManager(); }
590 */
591 protected T execute(S store) throws CommandException, StoreException {
592 T result = call(store);
593 return result;
594 }
595
596 /**
597 * Get command key
598 *
599 * @return command key
600 */
601 @Override
602 public String getKey(){
603 return this.key;
604 }
605
606 /**
607 * Get command lock key returning the key as an entity key, [not used] Just
608 * to be able to implement XCallable [to be deprecated]
609 *
610 * @return key
611 */
612 @Override
613 public String getEntityKey() {
614 return this.key;
615 }
616
617 /**
618 * set the mode of execution for the callable. True if in interrupt, false
619 * if not [to be deprecated]
620 */
621 public void setInterruptMode(boolean mode) {
622 }
623
624 /**
625 * [to be deprecated]
626 *
627 * @return the mode of execution. true if it is executed as an Interrupt,
628 * false otherwise
629 */
630 public boolean inInterruptMode() {
631 return false;
632 }
633
634 }