001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.util.ArrayList; 021 import java.util.List; 022 023 import org.apache.hadoop.conf.Configuration; 024 import org.apache.oozie.CoordinatorActionBean; 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.WorkflowActionBean; 027 import org.apache.oozie.command.CommandException; 028 import org.apache.oozie.command.coord.CoordActionCheckXCommand; 029 import org.apache.oozie.command.wf.ActionCheckXCommand; 030 import org.apache.oozie.executor.jpa.CoordActionsRunningGetJPAExecutor; 031 import org.apache.oozie.executor.jpa.JPAExecutorException; 032 import org.apache.oozie.executor.jpa.WorkflowActionsRunningGetJPAExecutor; 033 import org.apache.oozie.util.XCallable; 034 import org.apache.oozie.util.XLog; 035 036 /** 037 * The Action Checker Service queue ActionCheckCommands to check the status of 038 * running actions and CoordActionCheckCommands to check the status of 039 * coordinator actions. The delay between checks on the same action can be 040 * configured. 041 */ 042 public class ActionCheckerService implements Service { 043 044 public static final String CONF_PREFIX = Service.CONF_PREFIX + "ActionCheckerService."; 045 /** 046 * The frequency at which the ActionCheckService will run. 047 */ 048 public static final String CONF_ACTION_CHECK_INTERVAL = CONF_PREFIX + "action.check.interval"; 049 /** 050 * The time, in seconds, between an ActionCheck for the same action. 051 */ 052 public static final String CONF_ACTION_CHECK_DELAY = CONF_PREFIX + "action.check.delay"; 053 054 /** 055 * The number of callables to be queued in a batch. 056 */ 057 public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size"; 058 059 protected static final String INSTRUMENTATION_GROUP = "actionchecker"; 060 protected static final String INSTR_CHECK_ACTIONS_COUNTER = "checks_wf_actions"; 061 protected static final String INSTR_CHECK_COORD_ACTIONS_COUNTER = "checks_coord_actions"; 062 063 064 /** 065 * {@link ActionCheckRunnable} is the runnable which is scheduled to run and 066 * queue Action checks. 067 */ 068 static class ActionCheckRunnable implements Runnable { 069 private int actionCheckDelay; 070 private List<XCallable<Void>> callables; 071 private StringBuilder msg = null; 072 073 public ActionCheckRunnable(int actionCheckDelay) { 074 this.actionCheckDelay = actionCheckDelay; 075 } 076 077 public void run() { 078 XLog.Info.get().clear(); 079 XLog LOG = XLog.getLog(getClass()); 080 msg = new StringBuilder(); 081 try { 082 runWFActionCheck(); 083 runCoordActionCheck(); 084 } 085 catch (CommandException ce) { 086 LOG.error("Unable to run action checks, ", ce); 087 } 088 089 LOG.debug("QUEUING [{0}] for potential checking", msg.toString()); 090 if (null != callables) { 091 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 092 if (ret == false) { 093 LOG.warn("Unable to queue the callables commands for CheckerService. " 094 + "Most possibly command queue is full. Queue size is :" 095 + Services.get().get(CallableQueueService.class).queueSize()); 096 } 097 callables = null; 098 } 099 } 100 101 /** 102 * check workflow actions 103 * 104 * @throws CommandException 105 */ 106 private void runWFActionCheck() throws CommandException { 107 JPAService jpaService = Services.get().get(JPAService.class); 108 if (jpaService == null) { 109 throw new CommandException(ErrorCode.E0610); 110 } 111 112 List<WorkflowActionBean> actions; 113 try { 114 actions = jpaService 115 .execute(new WorkflowActionsRunningGetJPAExecutor(actionCheckDelay)); 116 } 117 catch (JPAExecutorException je) { 118 throw new CommandException(je); 119 } 120 121 if (actions == null || actions.size() == 0) { 122 return; 123 } 124 msg.append(" WF_ACTIONS : " + actions.size()); 125 126 for (WorkflowActionBean action : actions) { 127 Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP, 128 INSTR_CHECK_ACTIONS_COUNTER, 1); 129 queueCallable(new ActionCheckXCommand(action.getId())); 130 } 131 132 } 133 134 /** 135 * check coordinator actions 136 * 137 * @throws CommandException 138 */ 139 private void runCoordActionCheck() throws CommandException { 140 JPAService jpaService = Services.get().get(JPAService.class); 141 if (jpaService == null) { 142 throw new CommandException(ErrorCode.E0610); 143 } 144 145 List<String> cactionIds; 146 try { 147 cactionIds = jpaService.execute(new CoordActionsRunningGetJPAExecutor( 148 actionCheckDelay)); 149 } 150 catch (JPAExecutorException je) { 151 throw new CommandException(je); 152 } 153 154 if (cactionIds == null || cactionIds.size() == 0) { 155 return; 156 } 157 158 msg.append(" COORD_ACTIONS : " + cactionIds.size()); 159 for (String coordActionId : cactionIds) { 160 Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP, 161 INSTR_CHECK_COORD_ACTIONS_COUNTER, 1); 162 queueCallable(new CoordActionCheckXCommand(coordActionId, actionCheckDelay)); 163 } 164 165 } 166 167 /** 168 * Adds callables to a list. If the number of callables in the list 169 * reaches {@link ActionCheckerService#CONF_CALLABLE_BATCH_SIZE}, the 170 * entire batch is queued and the callables list is reset. 171 * 172 * @param callable the callable to queue. 173 */ 174 private void queueCallable(XCallable<Void> callable) { 175 if (callables == null) { 176 callables = new ArrayList<XCallable<Void>>(); 177 } 178 callables.add(callable); 179 if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) { 180 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 181 if (ret == false) { 182 XLog.getLog(getClass()).warn( 183 "Unable to queue the callables commands for CheckerService. " 184 + "Most possibly command queue is full. Queue size is :" 185 + Services.get().get(CallableQueueService.class).queueSize()); 186 } 187 callables = new ArrayList<XCallable<Void>>(); 188 } 189 } 190 } 191 192 /** 193 * Initializes the Action Check service. 194 * 195 * @param services services instance. 196 */ 197 @Override 198 public void init(Services services) { 199 Configuration conf = services.getConf(); 200 Runnable actionCheckRunnable = new ActionCheckRunnable(conf.getInt(CONF_ACTION_CHECK_DELAY, 600)); 201 services.get(SchedulerService.class).schedule(actionCheckRunnable, 10, 202 conf.getInt(CONF_ACTION_CHECK_INTERVAL, 60), SchedulerService.Unit.SEC); 203 } 204 205 /** 206 * Destroy the Action Checker Services. 207 */ 208 @Override 209 public void destroy() { 210 } 211 212 /** 213 * Return the public interface for the action checker service. 214 * 215 * @return {@link ActionCheckerService}. 216 */ 217 @Override 218 public Class<? extends Service> getInterface() { 219 return ActionCheckerService.class; 220 } 221 }