org.apache.oozie.service
Class CallableQueueService

java.lang.Object
  extended by org.apache.oozie.service.CallableQueueService
All Implemented Interfaces:
Service, Instrumentable

public class CallableQueueService
extends Object
implements Service, Instrumentable

The callable queue service queues XCallables for asynchronous execution.

Callables can be queued for immediate execution or for delayed execution (some time in the future).

Callables are consumed from the queue for execution based on their priority.

When the queues (for immediate execution and for delayed execution) are full, the callable queue service stops queuing callables.

A thread-pool is used to execute the callables asynchronously.

The following configuration parameters control the callable queue service:

CONF_QUEUE_SIZE size of the immediate execution queue. Defaulf value is 10000.

CONF_THREADS number of threads in the thread-pool used for asynchronous command execution. When this number of threads is reached, commands remain the queue until threads become available. Sets up a priority queue for the execution of Commands via a ThreadPool. Sets up a Delayed Queue to handle actions which will be ready for execution sometime in the future.


Field Summary
static int CONCURRENCY_DELAY
           
static String CONF_CALLABLE_CONCURRENCY
           
static String CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE
           
static String CONF_CALLABLE_INTERRUPT_TYPES
           
static String CONF_CALLABLE_NEXT_ELIGIBLE
           
static String CONF_PREFIX
           
static String CONF_QUEUE_SIZE
           
static String CONF_THREADS
           
static HashSet<String> INTERRUPT_TYPES
           
static int SAFE_MODE_DELAY
           
 
Fields inherited from interface org.apache.oozie.service.Service
DEFAULT_LOCK_TIMEOUT, lockTimeout, USE_XCOMMAND
 
Constructor Summary
CallableQueueService()
           
 
Method Summary
 Set<XCallable<?>> checkInterrupts(String lockKey)
          check the interrupt map for the existence of an interrupt commands if exist a List of Interrupt Callable for the same lock key will bereturned, otherwise it will return null
 void checkInterruptTypes(XCallable<?> callable)
          check if the callable is of an interrupt type and insert it into the map accordingly
 void destroy()
          Destroy the command queue service.
 Class<? extends Service> getInterface()
          Return the public interface for command queue service.
 List<String> getQueueDump()
          Get the list of strings of queue dump
 List<String> getUniqueDump()
          Get the list of strings of uniqueness map dump
 void init(Services services)
          Initialize the command queue service.
 void insertCallableIntoInterruptMap(XCallable<?> callable)
          insert a new callable in the Interrupt Command Map add a new element to the list or create a new list accordingly
 void instrument(Instrumentation instr)
          Instruments the callable queue service.
 boolean queue(XCallable<?> callable)
          Queue a callable for asynchronous execution.
 boolean queue(XCallable<?> callable, long delay)
          Queue a callable for asynchronous execution sometime in the future.
 boolean queueSerial(List<? extends XCallable<?>> callables)
          Queue a list of callables for serial execution.
 boolean queueSerial(List<? extends XCallable<?>> callables, long delay)
          Queue a list of callables for serial execution sometime in the future.
 int queueSize()
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

CONF_PREFIX

public static final String CONF_PREFIX
See Also:
Constant Field Values

CONF_QUEUE_SIZE

public static final String CONF_QUEUE_SIZE
See Also:
Constant Field Values

CONF_THREADS

public static final String CONF_THREADS
See Also:
Constant Field Values

CONF_CALLABLE_CONCURRENCY

public static final String CONF_CALLABLE_CONCURRENCY
See Also:
Constant Field Values

CONF_CALLABLE_NEXT_ELIGIBLE

public static final String CONF_CALLABLE_NEXT_ELIGIBLE
See Also:
Constant Field Values

CONF_CALLABLE_INTERRUPT_TYPES

public static final String CONF_CALLABLE_INTERRUPT_TYPES
See Also:
Constant Field Values

CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE

public static final String CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE
See Also:
Constant Field Values

CONCURRENCY_DELAY

public static final int CONCURRENCY_DELAY
See Also:
Constant Field Values

SAFE_MODE_DELAY

public static final int SAFE_MODE_DELAY
See Also:
Constant Field Values

INTERRUPT_TYPES

public static final HashSet<String> INTERRUPT_TYPES
Constructor Detail

CallableQueueService

public CallableQueueService()
Method Detail

init

public void init(Services services)
Initialize the command queue service.

Specified by:
init in interface Service
Parameters:
services - services instance.

destroy

public void destroy()
Destroy the command queue service.

Specified by:
destroy in interface Service

getInterface

public Class<? extends Service> getInterface()
Return the public interface for command queue service.

Specified by:
getInterface in interface Service
Returns:
CallableQueueService.

queueSize

public int queueSize()
Returns:
int size of queue

queue

public boolean queue(XCallable<?> callable)
Queue a callable for asynchronous execution.

Parameters:
callable - callable to queue.
Returns:
true if the callable was queued, false if the queue is full and the callable was not queued.

queueSerial

public boolean queueSerial(List<? extends XCallable<?>> callables)
Queue a list of callables for serial execution.

Useful to serialize callables that may compete with each other for resources.

All callables will be processed with the priority of the highest priority of all callables.

Parameters:
callables - callables to be executed by the composite callable.
Returns:
true if the callables were queued, false if the queue is full and the callables were not queued.

queue

public boolean queue(XCallable<?> callable,
                     long delay)
Queue a callable for asynchronous execution sometime in the future.

Parameters:
callable - callable to queue for delayed execution
delay - time, in milliseconds, that the callable should be delayed.
Returns:
true if the callable was queued, false if the queue is full and the callable was not queued.

queueSerial

public boolean queueSerial(List<? extends XCallable<?>> callables,
                           long delay)
Queue a list of callables for serial execution sometime in the future.

Useful to serialize callables that may compete with each other for resources.

All callables will be processed with the priority of the highest priority of all callables.

Parameters:
callables - callables to be executed by the composite callable.
delay - time, in milliseconds, that the callable should be delayed.
Returns:
true if the callables were queued, false if the queue is full and the callables were not queued.

instrument

public void instrument(Instrumentation instr)
Instruments the callable queue service.

Specified by:
instrument in interface Instrumentable
Parameters:
instr - instance to instrument the callable queue service to.

checkInterrupts

public Set<XCallable<?>> checkInterrupts(String lockKey)
check the interrupt map for the existence of an interrupt commands if exist a List of Interrupt Callable for the same lock key will bereturned, otherwise it will return null


checkInterruptTypes

public void checkInterruptTypes(XCallable<?> callable)
check if the callable is of an interrupt type and insert it into the map accordingly

Parameters:
callable -

insertCallableIntoInterruptMap

public void insertCallableIntoInterruptMap(XCallable<?> callable)
insert a new callable in the Interrupt Command Map add a new element to the list or create a new list accordingly

Parameters:
callable -

getQueueDump

public List<String> getQueueDump()
Get the list of strings of queue dump

Returns:
the list of string that representing each CallableWrapper

getUniqueDump

public List<String> getUniqueDump()
Get the list of strings of uniqueness map dump

Returns:
the list of string that representing the key of each command in the queue


Copyright © 2012 Apache Software Foundation. All Rights Reserved.