001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.util.ArrayList;
022import java.util.List;
023
024import org.apache.oozie.ErrorCode;
025import org.apache.oozie.WorkflowActionBean;
026import org.apache.oozie.command.CommandException;
027import org.apache.oozie.command.coord.CoordActionCheckXCommand;
028import org.apache.oozie.command.wf.ActionCheckXCommand;
029import org.apache.oozie.executor.jpa.CoordActionsRunningGetJPAExecutor;
030import org.apache.oozie.executor.jpa.JPAExecutorException;
031import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
032import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
033import org.apache.oozie.util.XCallable;
034import org.apache.oozie.util.XLog;
035
036/**
037 * The Action Checker Service queue ActionCheckCommands to check the status of
038 * running actions and CoordActionCheckCommands to check the status of
039 * coordinator actions. The delay between checks on the same action can be
040 * configured.
041 */
042public class ActionCheckerService implements Service {
043
044    public static final String CONF_PREFIX = Service.CONF_PREFIX + "ActionCheckerService.";
045    /**
046     * The frequency at which the ActionCheckService will run.
047     */
048    public static final String CONF_ACTION_CHECK_INTERVAL = CONF_PREFIX + "action.check.interval";
049    /**
050     * The time, in seconds, between an ActionCheck for the same action.
051     */
052    public static final String CONF_ACTION_CHECK_DELAY = CONF_PREFIX + "action.check.delay";
053
054    /**
055     * The number of callables to be queued in a batch.
056     */
057    public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
058
059    protected static final String INSTRUMENTATION_GROUP = "actionchecker";
060    protected static final String INSTR_CHECK_ACTIONS_COUNTER = "checks_wf_actions";
061    protected static final String INSTR_CHECK_COORD_ACTIONS_COUNTER = "checks_coord_actions";
062
063
064    /**
065     * {@link ActionCheckRunnable} is the runnable which is scheduled to run and
066     * queue Action checks.
067     */
068    static class ActionCheckRunnable implements Runnable {
069        private int actionCheckDelay;
070        private List<XCallable<Void>> callables;
071        private StringBuilder msg = null;
072
073        public ActionCheckRunnable(int actionCheckDelay) {
074            this.actionCheckDelay = actionCheckDelay;
075        }
076
077        public void run() {
078            XLog.Info.get().clear();
079            XLog LOG = XLog.getLog(getClass());
080            msg = new StringBuilder();
081            try {
082                runWFActionCheck();
083                runCoordActionCheck();
084            }
085            catch (CommandException ce) {
086                LOG.error("Unable to run action checks, ", ce);
087            }
088
089            LOG.debug("QUEUING [{0}] for potential checking", msg.toString());
090            if (null != callables) {
091                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
092                if (ret == false) {
093                    LOG.warn("Unable to queue the callables commands for CheckerService. "
094                            + "Most possibly command queue is full. Queue size is :"
095                            + Services.get().get(CallableQueueService.class).queueSize());
096                }
097                callables = null;
098            }
099        }
100
101        /**
102         * check workflow actions
103         *
104         * @throws CommandException
105         */
106        private void runWFActionCheck() throws CommandException {
107            JPAService jpaService = Services.get().get(JPAService.class);
108            if (jpaService == null) {
109                throw new CommandException(ErrorCode.E0610);
110            }
111
112            List<WorkflowActionBean> actions;
113            try {
114                actions = WorkflowActionQueryExecutor.getInstance().getList(WorkflowActionQuery.GET_RUNNING_ACTIONS,
115                        actionCheckDelay);
116            }
117            catch (JPAExecutorException je) {
118                throw new CommandException(je);
119            }
120
121            if (actions == null || actions.isEmpty()) {
122                return;
123            }
124
125            List<String> actionIds = toIds(actions);
126            try {
127                actionIds = Services.get().get(JobsConcurrencyService.class).getJobIdsForThisServer(actionIds);
128            }
129            catch (Exception ex) {
130                throw new CommandException(ErrorCode.E1700, ex.getMessage(), ex);
131            }
132
133            msg.append(" WF_ACTIONS : ").append(actionIds.size());
134
135            for (String actionId : actionIds) {
136                Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
137                        INSTR_CHECK_ACTIONS_COUNTER, 1);
138                    queueCallable(new ActionCheckXCommand(actionId));
139            }
140
141        }
142
143        /**
144         * check coordinator actions
145         *
146         * @throws CommandException
147         */
148        private void runCoordActionCheck() throws CommandException {
149            JPAService jpaService = Services.get().get(JPAService.class);
150            if (jpaService == null) {
151                throw new CommandException(ErrorCode.E0610);
152            }
153
154            List<String> cactionIds;
155            try {
156                cactionIds = jpaService.execute(new CoordActionsRunningGetJPAExecutor(
157                        actionCheckDelay));
158            }
159            catch (JPAExecutorException je) {
160                throw new CommandException(je);
161            }
162
163            if (cactionIds == null || cactionIds.isEmpty()) {
164                return;
165            }
166
167            try {
168                cactionIds = Services.get().get(JobsConcurrencyService.class).getJobIdsForThisServer(cactionIds);
169            }
170            catch (Exception ex) {
171                throw new CommandException(ErrorCode.E1700, ex.getMessage(), ex);
172            }
173
174            msg.append(" COORD_ACTIONS : ").append(cactionIds.size());
175
176            for (String coordActionId : cactionIds) {
177                Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
178                        INSTR_CHECK_COORD_ACTIONS_COUNTER, 1);
179                    queueCallable(new CoordActionCheckXCommand(coordActionId, actionCheckDelay));
180            }
181
182        }
183
184        /**
185         * Adds callables to a list. If the number of callables in the list
186         * reaches {@link ActionCheckerService#CONF_CALLABLE_BATCH_SIZE}, the
187         * entire batch is queued and the callables list is reset.
188         *
189         * @param callable the callable to queue.
190         */
191        private void queueCallable(XCallable<Void> callable) {
192            if (callables == null) {
193                callables = new ArrayList<XCallable<Void>>();
194            }
195            callables.add(callable);
196            if (callables.size() == ConfigurationService.getInt(CONF_CALLABLE_BATCH_SIZE)) {
197                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
198                if (ret == false) {
199                    XLog.getLog(getClass()).warn(
200                            "Unable to queue the callables commands for CheckerService. "
201                                    + "Most possibly command queue is full. Queue size is :"
202                                    + Services.get().get(CallableQueueService.class).queueSize());
203                }
204                callables = new ArrayList<XCallable<Void>>();
205            }
206        }
207
208        private List<String> toIds(List<WorkflowActionBean> actions) {
209            List<String> ids = new ArrayList<String>(actions.size());
210            for (WorkflowActionBean action : actions) {
211                ids.add(action.getId());
212            }
213            return ids;
214        }
215    }
216
217    /**
218     * Initializes the Action Check service.
219     *
220     * @param services services instance.
221     */
222    @Override
223    public void init(Services services) {
224        Runnable actionCheckRunnable = new ActionCheckRunnable(ConfigurationService.getInt
225                (services.getConf(), CONF_ACTION_CHECK_DELAY));
226        services.get(SchedulerService.class).schedule(actionCheckRunnable, 10,
227                ConfigurationService.getInt(services.getConf(), CONF_ACTION_CHECK_INTERVAL),
228                SchedulerService.Unit.SEC);
229    }
230
231    /**
232     * Destroy the Action Checker Services.
233     */
234    @Override
235    public void destroy() {
236    }
237
238    /**
239     * Return the public interface for the action checker service.
240     *
241     * @return {@link ActionCheckerService}.
242     */
243    @Override
244    public Class<? extends Service> getInterface() {
245        return ActionCheckerService.class;
246    }
247}