001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.util.ArrayList; 022import java.util.List; 023 024import org.apache.oozie.ErrorCode; 025import org.apache.oozie.WorkflowActionBean; 026import org.apache.oozie.command.CommandException; 027import org.apache.oozie.command.coord.CoordActionCheckXCommand; 028import org.apache.oozie.command.wf.ActionCheckXCommand; 029import org.apache.oozie.executor.jpa.CoordActionsRunningGetJPAExecutor; 030import org.apache.oozie.executor.jpa.JPAExecutorException; 031import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; 032import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 033import org.apache.oozie.util.XCallable; 034import org.apache.oozie.util.XLog; 035 036/** 037 * The Action Checker Service queue ActionCheckCommands to check the status of 038 * running actions and CoordActionCheckCommands to check the status of 039 * coordinator actions. The delay between checks on the same action can be 040 * configured. 041 */ 042public class ActionCheckerService implements Service { 043 044 public static final String CONF_PREFIX = Service.CONF_PREFIX + "ActionCheckerService."; 045 /** 046 * The frequency at which the ActionCheckService will run. 047 */ 048 public static final String CONF_ACTION_CHECK_INTERVAL = CONF_PREFIX + "action.check.interval"; 049 /** 050 * The time, in seconds, between an ActionCheck for the same action. 051 */ 052 public static final String CONF_ACTION_CHECK_DELAY = CONF_PREFIX + "action.check.delay"; 053 054 /** 055 * The number of callables to be queued in a batch. 056 */ 057 public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size"; 058 059 protected static final String INSTRUMENTATION_GROUP = "actionchecker"; 060 protected static final String INSTR_CHECK_ACTIONS_COUNTER = "checks_wf_actions"; 061 protected static final String INSTR_CHECK_COORD_ACTIONS_COUNTER = "checks_coord_actions"; 062 063 064 /** 065 * {@link ActionCheckRunnable} is the runnable which is scheduled to run and 066 * queue Action checks. 067 */ 068 static class ActionCheckRunnable implements Runnable { 069 private int actionCheckDelay; 070 private List<XCallable<Void>> callables; 071 private StringBuilder msg = null; 072 073 public ActionCheckRunnable(int actionCheckDelay) { 074 this.actionCheckDelay = actionCheckDelay; 075 } 076 077 public void run() { 078 XLog.Info.get().clear(); 079 XLog LOG = XLog.getLog(getClass()); 080 msg = new StringBuilder(); 081 try { 082 runWFActionCheck(); 083 runCoordActionCheck(); 084 } 085 catch (CommandException ce) { 086 LOG.error("Unable to run action checks, ", ce); 087 } 088 089 LOG.debug("QUEUING [{0}] for potential checking", msg.toString()); 090 if (null != callables) { 091 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 092 if (ret == false) { 093 LOG.warn("Unable to queue the callables commands for CheckerService. " 094 + "Most possibly command queue is full. Queue size is :" 095 + Services.get().get(CallableQueueService.class).queueSize()); 096 } 097 callables = null; 098 } 099 } 100 101 /** 102 * check workflow actions 103 * 104 * @throws CommandException 105 */ 106 private void runWFActionCheck() throws CommandException { 107 JPAService jpaService = Services.get().get(JPAService.class); 108 if (jpaService == null) { 109 throw new CommandException(ErrorCode.E0610); 110 } 111 112 List<WorkflowActionBean> actions; 113 try { 114 actions = WorkflowActionQueryExecutor.getInstance().getList(WorkflowActionQuery.GET_RUNNING_ACTIONS, 115 actionCheckDelay); 116 } 117 catch (JPAExecutorException je) { 118 throw new CommandException(je); 119 } 120 121 if (actions == null || actions.isEmpty()) { 122 return; 123 } 124 125 List<String> actionIds = toIds(actions); 126 try { 127 actionIds = Services.get().get(JobsConcurrencyService.class).getJobIdsForThisServer(actionIds); 128 } 129 catch (Exception ex) { 130 throw new CommandException(ErrorCode.E1700, ex.getMessage(), ex); 131 } 132 133 msg.append(" WF_ACTIONS : ").append(actionIds.size()); 134 135 for (String actionId : actionIds) { 136 Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP, 137 INSTR_CHECK_ACTIONS_COUNTER, 1); 138 queueCallable(new ActionCheckXCommand(actionId)); 139 } 140 141 } 142 143 /** 144 * check coordinator actions 145 * 146 * @throws CommandException 147 */ 148 private void runCoordActionCheck() throws CommandException { 149 JPAService jpaService = Services.get().get(JPAService.class); 150 if (jpaService == null) { 151 throw new CommandException(ErrorCode.E0610); 152 } 153 154 List<String> cactionIds; 155 try { 156 cactionIds = jpaService.execute(new CoordActionsRunningGetJPAExecutor( 157 actionCheckDelay)); 158 } 159 catch (JPAExecutorException je) { 160 throw new CommandException(je); 161 } 162 163 if (cactionIds == null || cactionIds.isEmpty()) { 164 return; 165 } 166 167 try { 168 cactionIds = Services.get().get(JobsConcurrencyService.class).getJobIdsForThisServer(cactionIds); 169 } 170 catch (Exception ex) { 171 throw new CommandException(ErrorCode.E1700, ex.getMessage(), ex); 172 } 173 174 msg.append(" COORD_ACTIONS : ").append(cactionIds.size()); 175 176 for (String coordActionId : cactionIds) { 177 Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP, 178 INSTR_CHECK_COORD_ACTIONS_COUNTER, 1); 179 queueCallable(new CoordActionCheckXCommand(coordActionId, actionCheckDelay)); 180 } 181 182 } 183 184 /** 185 * Adds callables to a list. If the number of callables in the list 186 * reaches {@link ActionCheckerService#CONF_CALLABLE_BATCH_SIZE}, the 187 * entire batch is queued and the callables list is reset. 188 * 189 * @param callable the callable to queue. 190 */ 191 private void queueCallable(XCallable<Void> callable) { 192 if (callables == null) { 193 callables = new ArrayList<XCallable<Void>>(); 194 } 195 callables.add(callable); 196 if (callables.size() == ConfigurationService.getInt(CONF_CALLABLE_BATCH_SIZE)) { 197 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 198 if (ret == false) { 199 XLog.getLog(getClass()).warn( 200 "Unable to queue the callables commands for CheckerService. " 201 + "Most possibly command queue is full. Queue size is :" 202 + Services.get().get(CallableQueueService.class).queueSize()); 203 } 204 callables = new ArrayList<XCallable<Void>>(); 205 } 206 } 207 208 private List<String> toIds(List<WorkflowActionBean> actions) { 209 List<String> ids = new ArrayList<String>(actions.size()); 210 for (WorkflowActionBean action : actions) { 211 ids.add(action.getId()); 212 } 213 return ids; 214 } 215 } 216 217 /** 218 * Initializes the Action Check service. 219 * 220 * @param services services instance. 221 */ 222 @Override 223 public void init(Services services) { 224 Runnable actionCheckRunnable = new ActionCheckRunnable(ConfigurationService.getInt 225 (services.getConf(), CONF_ACTION_CHECK_DELAY)); 226 services.get(SchedulerService.class).schedule(actionCheckRunnable, 10, 227 ConfigurationService.getInt(services.getConf(), CONF_ACTION_CHECK_INTERVAL), 228 SchedulerService.Unit.SEC); 229 } 230 231 /** 232 * Destroy the Action Checker Services. 233 */ 234 @Override 235 public void destroy() { 236 } 237 238 /** 239 * Return the public interface for the action checker service. 240 * 241 * @return {@link ActionCheckerService}. 242 */ 243 @Override 244 public Class<? extends Service> getInterface() { 245 return ActionCheckerService.class; 246 } 247}