001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.util.ArrayList; 021 import java.util.List; 022 023 import org.apache.hadoop.conf.Configuration; 024 import org.apache.oozie.CoordinatorActionBean; 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.WorkflowActionBean; 027 import org.apache.oozie.command.CommandException; 028 import org.apache.oozie.command.coord.CoordActionCheckCommand; 029 import org.apache.oozie.command.coord.CoordActionCheckXCommand; 030 import org.apache.oozie.command.wf.ActionCheckCommand; 031 import org.apache.oozie.command.wf.ActionCheckXCommand; 032 import org.apache.oozie.executor.jpa.CoordActionsRunningGetJPAExecutor; 033 import org.apache.oozie.executor.jpa.JPAExecutorException; 034 import org.apache.oozie.executor.jpa.WorkflowActionsRunningGetJPAExecutor; 035 import org.apache.oozie.util.XCallable; 036 import org.apache.oozie.util.XLog; 037 038 /** 039 * The Action Checker Service queue ActionCheckCommands to check the status of 040 * running actions and CoordActionCheckCommands to check the status of 041 * coordinator actions. The delay between checks on the same action can be 042 * configured. 043 */ 044 public class ActionCheckerService implements Service { 045 046 public static final String CONF_PREFIX = Service.CONF_PREFIX + "ActionCheckerService."; 047 /** 048 * The frequency at which the ActionCheckService will run. 049 */ 050 public static final String CONF_ACTION_CHECK_INTERVAL = CONF_PREFIX + "action.check.interval"; 051 /** 052 * The time, in seconds, between an ActionCheck for the same action. 053 */ 054 public static final String CONF_ACTION_CHECK_DELAY = CONF_PREFIX + "action.check.delay"; 055 056 /** 057 * The number of callables to be queued in a batch. 058 */ 059 public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size"; 060 061 protected static final String INSTRUMENTATION_GROUP = "actionchecker"; 062 protected static final String INSTR_CHECK_ACTIONS_COUNTER = "checks_wf_actions"; 063 protected static final String INSTR_CHECK_COORD_ACTIONS_COUNTER = "checks_coord_actions"; 064 065 private static boolean useXCommand = true; 066 067 /** 068 * {@link ActionCheckRunnable} is the runnable which is scheduled to run and 069 * queue Action checks. 070 */ 071 static class ActionCheckRunnable implements Runnable { 072 private int actionCheckDelay; 073 private List<XCallable<Void>> callables; 074 private StringBuilder msg = null; 075 076 public ActionCheckRunnable(int actionCheckDelay) { 077 this.actionCheckDelay = actionCheckDelay; 078 } 079 080 public void run() { 081 XLog.Info.get().clear(); 082 XLog LOG = XLog.getLog(getClass()); 083 msg = new StringBuilder(); 084 try { 085 runWFActionCheck(); 086 runCoordActionCheck(); 087 } 088 catch (CommandException ce) { 089 LOG.error("Unable to run action checks, ", ce); 090 } 091 092 LOG.debug("QUEUING [{0}] for potential checking", msg.toString()); 093 if (null != callables) { 094 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 095 if (ret == false) { 096 LOG.warn("Unable to queue the callables commands for CheckerService. " 097 + "Most possibly command queue is full. Queue size is :" 098 + Services.get().get(CallableQueueService.class).queueSize()); 099 } 100 callables = null; 101 } 102 } 103 104 /** 105 * check workflow actions 106 * 107 * @throws CommandException 108 */ 109 private void runWFActionCheck() throws CommandException { 110 JPAService jpaService = Services.get().get(JPAService.class); 111 if (jpaService == null) { 112 throw new CommandException(ErrorCode.E0610); 113 } 114 115 List<WorkflowActionBean> actions; 116 try { 117 actions = jpaService 118 .execute(new WorkflowActionsRunningGetJPAExecutor(actionCheckDelay)); 119 } 120 catch (JPAExecutorException je) { 121 throw new CommandException(je); 122 } 123 124 if (actions == null || actions.size() == 0) { 125 return; 126 } 127 msg.append(" WF_ACTIONS : " + actions.size()); 128 129 for (WorkflowActionBean action : actions) { 130 Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP, 131 INSTR_CHECK_ACTIONS_COUNTER, 1); 132 if (useXCommand) { 133 queueCallable(new ActionCheckXCommand(action.getId())); 134 } 135 else { 136 queueCallable(new ActionCheckCommand(action.getId())); 137 } 138 } 139 140 } 141 142 /** 143 * check coordinator actions 144 * 145 * @throws CommandException 146 */ 147 private void runCoordActionCheck() throws CommandException { 148 JPAService jpaService = Services.get().get(JPAService.class); 149 if (jpaService == null) { 150 throw new CommandException(ErrorCode.E0610); 151 } 152 153 List<CoordinatorActionBean> cactions; 154 try { 155 cactions = jpaService.execute(new CoordActionsRunningGetJPAExecutor( 156 actionCheckDelay)); 157 } 158 catch (JPAExecutorException je) { 159 throw new CommandException(je); 160 } 161 162 if (cactions == null || cactions.size() == 0) { 163 return; 164 } 165 166 msg.append(" COORD_ACTIONS : " + cactions.size()); 167 168 for (CoordinatorActionBean caction : cactions) { 169 Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP, 170 INSTR_CHECK_COORD_ACTIONS_COUNTER, 1); 171 if (useXCommand) { 172 queueCallable(new CoordActionCheckXCommand(caction.getId(), actionCheckDelay)); 173 } 174 else { 175 queueCallable(new CoordActionCheckCommand(caction.getId(), actionCheckDelay)); 176 } 177 } 178 179 } 180 181 /** 182 * Adds callables to a list. If the number of callables in the list 183 * reaches {@link ActionCheckerService#CONF_CALLABLE_BATCH_SIZE}, the 184 * entire batch is queued and the callables list is reset. 185 * 186 * @param callable the callable to queue. 187 */ 188 private void queueCallable(XCallable<Void> callable) { 189 if (callables == null) { 190 callables = new ArrayList<XCallable<Void>>(); 191 } 192 callables.add(callable); 193 if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) { 194 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 195 if (ret == false) { 196 XLog.getLog(getClass()).warn( 197 "Unable to queue the callables commands for CheckerService. " 198 + "Most possibly command queue is full. Queue size is :" 199 + Services.get().get(CallableQueueService.class).queueSize()); 200 } 201 callables = new ArrayList<XCallable<Void>>(); 202 } 203 } 204 } 205 206 /** 207 * Initializes the Action Check service. 208 * 209 * @param services services instance. 210 */ 211 @Override 212 public void init(Services services) { 213 Configuration conf = services.getConf(); 214 Runnable actionCheckRunnable = new ActionCheckRunnable(conf.getInt(CONF_ACTION_CHECK_DELAY, 600)); 215 services.get(SchedulerService.class).schedule(actionCheckRunnable, 10, 216 conf.getInt(CONF_ACTION_CHECK_INTERVAL, 60), SchedulerService.Unit.SEC); 217 218 if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) { 219 useXCommand = false; 220 } 221 222 } 223 224 /** 225 * Destroy the Action Checker Services. 226 */ 227 @Override 228 public void destroy() { 229 } 230 231 /** 232 * Return the public interface for the action checker service. 233 * 234 * @return {@link ActionCheckerService}. 235 */ 236 @Override 237 public Class<? extends Service> getInterface() { 238 return ActionCheckerService.class; 239 } 240 }