This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command;
019
020 import org.apache.oozie.ErrorCode;
021 import org.apache.oozie.FaultInjection;
022 import org.apache.oozie.XException;
023 import org.apache.oozie.service.CallableQueueService;
024 import org.apache.oozie.service.InstrumentationService;
025 import org.apache.oozie.service.MemoryLocksService;
026 import org.apache.oozie.service.Services;
027 import org.apache.oozie.util.Instrumentation;
028 import org.apache.oozie.util.MemoryLocks;
029 import org.apache.oozie.util.XCallable;
030 import org.apache.oozie.util.XLog;
031
032 import java.util.ArrayList;
033 import java.util.HashMap;
034 import java.util.List;
035 import java.util.Map;
036 import java.util.UUID;
037
038 /**
039 * Base class for synchronous and asynchronous commands.
040 * <p/>
041 * It enables by API the following pattern:
042 * <p/>
043 * <ul>
044 * <li>single execution: a command instance can be executed only once</li>
045 * <li>eager data loading: loads data for eager precondition check</li>
046 * <li>eager precondition check: verify precondition before obtaining lock</li>
047 * <li>data loading: loads data for precondition check and execution</li>
048 * <li>precondition check: verifies precondition for execution is still met</li>
049 * <li>locking: obtains exclusive lock on key before executing the command</li>
050 * <li>execution: command logic</li>
051 * </ul>
052 * <p/>
053 * It has built in instrumentation and logging.
054 */
055 public abstract class XCommand<T> implements XCallable<T> {
056 public static final String DEFAULT_LOCK_TIMEOUT = "oozie.command.default.lock.timeout";
057
058 public static final String INSTRUMENTATION_GROUP = "commands";
059
060 public static final Long DEFAULT_REQUEUE_DELAY = 10L;
061
062 public XLog LOG = XLog.getLog(getClass());
063
064 private String key;
065 private String name;
066 private int priority;
067 private String type;
068 private long createdTime;
069 private MemoryLocks.LockToken lock;
070 private boolean used = false;
071
072 private Map<Long, List<XCommand<?>>> commandQueue;
073 protected boolean dryrun = false;
074 protected Instrumentation instrumentation;
075
076 protected XLog.Info logInfo;
077
078 /**
079 * Create a command.
080 *
081 * @param name command name.
082 * @param type command type.
083 * @param priority command priority.
084 */
085 public XCommand(String name, String type, int priority) {
086 this.name = name;
087 this.type = type;
088 this.priority = priority;
089 this.key = name + "_" + UUID.randomUUID();
090 createdTime = System.currentTimeMillis();
091 logInfo = new XLog.Info();
092 instrumentation = Services.get().get(InstrumentationService.class).get();
093 }
094
095 /**
096 * @param name command name.
097 * @param type command type.
098 * @param priority command priority.
099 * @param dryrun indicates if dryrun option is enabled. if enabled bundle will show a diagnostic output without
100 * really running the job
101 */
102 public XCommand(String name, String type, int priority, boolean dryrun) {
103 this(name, type, priority);
104 this.dryrun = dryrun;
105 }
106
107 /**
108 * Return the command name.
109 *
110 * @return the command name.
111 */
112 @Override
113 public String getName() {
114 return name;
115 }
116
117 /**
118 * Return the callable type.
119 * <p/>
120 * The command type is used for concurrency throttling in the {@link CallableQueueService}.
121 *
122 * @return the command type.
123 */
124 @Override
125 public String getType() {
126 return type;
127 }
128
129 /**
130 * Return the priority of the command.
131 *
132 * @return the command priority.
133 */
134 @Override
135 public int getPriority() {
136 return priority;
137 }
138
139 /**
140 * Returns the creation time of the command.
141 *
142 * @return the command creation time, in milliseconds.
143 */
144 @Override
145 public long getCreatedTime() {
146 return createdTime;
147 }
148
149 /**
150 * Queue a command for execution after the current command execution completes.
151 * <p/>
152 * All commands queued during the execution of the current command will be queued for a single serial execution.
153 * <p/>
154 * If the command execution throws an exception, no command will be effectively queued.
155 *
156 * @param command command to queue.
157 */
158 protected void queue(XCommand<?> command) {
159 queue(command, 0);
160 }
161
162 /**
163 * Queue a command for delayed execution after the current command execution completes.
164 * <p/>
165 * All commands queued during the execution of the current command with the same delay will be queued for a single
166 * serial execution.
167 * <p/>
168 * If the command execution throws an exception, no command will be effectively queued.
169 *
170 * @param command command to queue.
171 * @param msDelay delay in milliseconds.
172 */
173 protected void queue(XCommand<?> command, long msDelay) {
174 if (commandQueue == null) {
175 commandQueue = new HashMap<Long, List<XCommand<?>>>();
176 }
177 List<XCommand<?>> list = commandQueue.get(msDelay);
178 if (list == null) {
179 list = new ArrayList<XCommand<?>>();
180 commandQueue.put(msDelay, list);
181 }
182 list.add(command);
183 }
184
185 /**
186 * Obtain an exclusive lock on the {link #getEntityKey}.
187 * <p/>
188 * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock.
189 *
190 * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock
191 * @throws CommandException thrown i the lock could not be obtained.
192 */
193 private void acquireLock() throws InterruptedException, CommandException {
194 lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut());
195 if (lock == null) {
196 Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get();
197 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1);
198 if (isReQueueRequired()) {
199 //if not acquire the lock, re-queue itself with default delay
200 resetUsed();
201 queue(this, getRequeueDelay());
202 LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), getLockTimeOut(), getName());
203 } else {
204 throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut());
205 }
206 } else {
207 LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName());
208 }
209 }
210
211 /**
212 * Release the lock on the {link #getEntityKey}.
213 */
214 private void releaseLock() {
215 if (lock != null) {
216 lock.release();
217 LOG.debug("Released lock for [{0}] in [{1}]", getEntityKey(), getName());
218 }
219 }
220
221 /**
222 * Implements the XCommand life-cycle.
223 *
224 * @return the {link #execute} return value.
225 * @throws Exception thrown if the command could not be executed.
226 */
227 @Override
228 public final T call() throws CommandException {
229 if (used) {
230 throw new IllegalStateException(this.getClass().getSimpleName() + " already used.");
231 }
232 used = true;
233 commandQueue = null;
234 Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get();
235 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1);
236 Instrumentation.Cron callCron = new Instrumentation.Cron();
237 try {
238 callCron.start();
239 eagerLoadState();
240 LOG = XLog.resetPrefix(LOG);
241 eagerVerifyPrecondition();
242 try {
243 T ret = null;
244 if (isLockRequired()) {
245 Instrumentation.Cron acquireLockCron = new Instrumentation.Cron();
246 acquireLockCron.start();
247 acquireLock();
248 acquireLockCron.stop();
249 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron);
250 }
251 if (!isLockRequired() || (isLockRequired() && lock != null)) {
252 LOG.debug("Load state for [{0}]", getEntityKey());
253 loadState();
254 LOG = XLog.resetPrefix(LOG);
255 LOG.debug("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey());
256 verifyPrecondition();
257 LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey());
258 Instrumentation.Cron executeCron = new Instrumentation.Cron();
259 executeCron.start();
260 ret = execute();
261 executeCron.stop();
262 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron);
263 }
264 if (commandQueue != null) {
265 CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
266 for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) {
267 LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey());
268 if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) {
269 LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue()
270 .size(), entry.getKey());
271 }
272 }
273 }
274 return ret;
275 }
276 finally {
277 if (isLockRequired()) {
278 releaseLock();
279 }
280 }
281 }
282 catch(PreconditionException pex){
283 LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString());
284 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1);
285 return null;
286 }
287 catch (XException ex) {
288 LOG.error("XException, ", ex);
289 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1);
290 if (ex instanceof CommandException) {
291 throw (CommandException) ex;
292 }
293 else {
294 throw new CommandException(ex);
295 }
296 }
297 catch (Exception ex) {
298 LOG.error("Exception, ", ex);
299 instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1);
300 throw new CommandException(ErrorCode.E0607, ex);
301 }
302 finally {
303 FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
304 callCron.stop();
305 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron);
306 }
307 }
308
309 /**
310 * Return the time out when acquiring a lock.
311 * <p/>
312 * The value is loaded from the Oozie configuration, the property {link #DEFAULT_LOCK_TIMEOUT}.
313 * <p/>
314 * Subclasses should override this method if they want to use a different time out.
315 *
316 * @return the lock time out in milliseconds.
317 */
318 protected long getLockTimeOut() {
319 return Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000);
320 }
321
322 /**
323 * Indicate if the the command requires locking.
324 * <p/>
325 * Subclasses should override this method if they require locking.
326 *
327 * @return <code>true/false</code>
328 */
329 protected abstract boolean isLockRequired();
330
331 /**
332 * Return the entity key for the command.
333 * <p/>
334 *
335 * @return the entity key for the command.
336 */
337 protected abstract String getEntityKey();
338
339 /**
340 * Indicate if the the command requires to requeue itself if the lock is not acquired.
341 * <p/>
342 * Subclasses should override this method if they don't want to requeue.
343 * <p/>
344 * Default is true.
345 *
346 * @return <code>true/false</code>
347 */
348 protected boolean isReQueueRequired() {
349 return true;
350 }
351
352 /**
353 * Load the necessary state to perform an eager precondition check.
354 * <p/>
355 * This implementation does a NOP.
356 * <p/>
357 * Subclasses should override this method and load the state needed to do an eager precondition check.
358 * <p/>
359 * A trivial implementation is calling {link #loadState}.
360 */
361 protected void eagerLoadState() throws CommandException{
362 }
363
364 /**
365 * Verify the precondition for the command before obtaining a lock.
366 * <p/>
367 * This implementation does a NOP.
368 * <p/>
369 * A trivial implementation is calling {link #verifyPrecondition}.
370 *
371 * @throws CommandException thrown if the precondition is not met.
372 */
373 protected void eagerVerifyPrecondition() throws CommandException,PreconditionException {
374 }
375
376 /**
377 * Load the necessary state to perform the precondition check and to execute the command.
378 * <p/>
379 * Subclasses must implement this method and load the state needed to do the precondition check and execute the
380 * command.
381 */
382 protected abstract void loadState() throws CommandException;
383
384 /**
385 * Verify the precondition for the command after a lock has been obtain, just before executing the command.
386 * <p/>
387 *
388 * @throws CommandException thrown if the precondition is not met.
389 */
390 protected abstract void verifyPrecondition() throws CommandException,PreconditionException;
391
392 /**
393 * Command execution body.
394 * <p/>
395 * This method will be invoked after the {link #loadState} and {link #verifyPrecondition} methods.
396 * <p/>
397 * If the command requires locking, this method will be invoked ONLY if the lock has been acquired.
398 *
399 * @return a return value from the execution of the command, only meaningful if the command is executed
400 * synchronously.
401 * @throws CommandException thrown if the command execution failed.
402 */
403 protected abstract T execute() throws CommandException;
404
405
406 /**
407 * Return the {@link Instrumentation} instance in use.
408 *
409 * @return the {@link Instrumentation} instance in use.
410 */
411 protected Instrumentation getInstrumentation() {
412 return instrumentation;
413 }
414
415 /**
416 * @param used set false to the used
417 */
418 public void resetUsed() {
419 this.used = false;
420 }
421
422
423 /**
424 * Return the delay time for requeue
425 *
426 * @return delay time when requeue itself
427 */
428 protected Long getRequeueDelay() {
429 return DEFAULT_REQUEUE_DELAY;
430 }
431
432 /**
433 * Get command key
434 *
435 * @return command key
436 */
437 @Override
438 public String getKey(){
439 return this.key;
440 }
441
442 /**
443 * Get XLog log
444 *
445 * @return XLog
446 */
447 public XLog getLog() {
448 return LOG;
449 }
450
451 }