001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.util.ArrayList;
021    import java.util.List;
022    
023    import org.apache.hadoop.conf.Configuration;
024    import org.apache.oozie.CoordinatorActionBean;
025    import org.apache.oozie.ErrorCode;
026    import org.apache.oozie.WorkflowActionBean;
027    import org.apache.oozie.command.CommandException;
028    import org.apache.oozie.command.coord.CoordActionCheckCommand;
029    import org.apache.oozie.command.coord.CoordActionCheckXCommand;
030    import org.apache.oozie.command.wf.ActionCheckCommand;
031    import org.apache.oozie.command.wf.ActionCheckXCommand;
032    import org.apache.oozie.executor.jpa.CoordActionsRunningGetJPAExecutor;
033    import org.apache.oozie.executor.jpa.JPAExecutorException;
034    import org.apache.oozie.executor.jpa.WorkflowActionsRunningGetJPAExecutor;
035    import org.apache.oozie.util.XCallable;
036    import org.apache.oozie.util.XLog;
037    
038    /**
039     * The Action Checker Service queue ActionCheckCommands to check the status of
040     * running actions and CoordActionCheckCommands to check the status of
041     * coordinator actions. The delay between checks on the same action can be
042     * configured.
043     */
044    public class ActionCheckerService implements Service {
045    
046        public static final String CONF_PREFIX = Service.CONF_PREFIX + "ActionCheckerService.";
047        /**
048         * The frequency at which the ActionCheckService will run.
049         */
050        public static final String CONF_ACTION_CHECK_INTERVAL = CONF_PREFIX + "action.check.interval";
051        /**
052         * The time, in seconds, between an ActionCheck for the same action.
053         */
054        public static final String CONF_ACTION_CHECK_DELAY = CONF_PREFIX + "action.check.delay";
055    
056        /**
057         * The number of callables to be queued in a batch.
058         */
059        public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
060    
061        protected static final String INSTRUMENTATION_GROUP = "actionchecker";
062        protected static final String INSTR_CHECK_ACTIONS_COUNTER = "checks_wf_actions";
063        protected static final String INSTR_CHECK_COORD_ACTIONS_COUNTER = "checks_coord_actions";
064    
065        private static boolean useXCommand = true;
066    
067        /**
068         * {@link ActionCheckRunnable} is the runnable which is scheduled to run and
069         * queue Action checks.
070         */
071        static class ActionCheckRunnable implements Runnable {
072            private int actionCheckDelay;
073            private List<XCallable<Void>> callables;
074            private StringBuilder msg = null;
075    
076            public ActionCheckRunnable(int actionCheckDelay) {
077                this.actionCheckDelay = actionCheckDelay;
078            }
079    
080            public void run() {
081                XLog.Info.get().clear();
082                XLog LOG = XLog.getLog(getClass());
083                msg = new StringBuilder();
084                try {
085                    runWFActionCheck();
086                    runCoordActionCheck();
087                }
088                catch (CommandException ce) {
089                    LOG.error("Unable to run action checks, ", ce);
090                }
091    
092                LOG.debug("QUEUING [{0}] for potential checking", msg.toString());
093                if (null != callables) {
094                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
095                    if (ret == false) {
096                        LOG.warn("Unable to queue the callables commands for CheckerService. "
097                                + "Most possibly command queue is full. Queue size is :"
098                                + Services.get().get(CallableQueueService.class).queueSize());
099                    }
100                    callables = null;
101                }
102            }
103    
104            /**
105             * check workflow actions
106             *
107             * @throws CommandException
108             */
109            private void runWFActionCheck() throws CommandException {
110                JPAService jpaService = Services.get().get(JPAService.class);
111                if (jpaService == null) {
112                    throw new CommandException(ErrorCode.E0610);
113                }
114    
115                List<WorkflowActionBean> actions;
116                try {
117                    actions = jpaService
118                            .execute(new WorkflowActionsRunningGetJPAExecutor(actionCheckDelay));
119                }
120                catch (JPAExecutorException je) {
121                    throw new CommandException(je);
122                }
123    
124                if (actions == null || actions.size() == 0) {
125                    return;
126                }
127                msg.append(" WF_ACTIONS : " + actions.size());
128    
129                for (WorkflowActionBean action : actions) {
130                    Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
131                            INSTR_CHECK_ACTIONS_COUNTER, 1);
132                    if (useXCommand) {
133                        queueCallable(new ActionCheckXCommand(action.getId()));
134                    }
135                    else {
136                        queueCallable(new ActionCheckCommand(action.getId()));
137                    }
138                }
139    
140            }
141    
142            /**
143             * check coordinator actions
144             *
145             * @throws CommandException
146             */
147            private void runCoordActionCheck() throws CommandException {
148                JPAService jpaService = Services.get().get(JPAService.class);
149                if (jpaService == null) {
150                    throw new CommandException(ErrorCode.E0610);
151                }
152    
153                List<CoordinatorActionBean> cactions;
154                try {
155                    cactions = jpaService.execute(new CoordActionsRunningGetJPAExecutor(
156                            actionCheckDelay));
157                }
158                catch (JPAExecutorException je) {
159                    throw new CommandException(je);
160                }
161    
162                if (cactions == null || cactions.size() == 0) {
163                    return;
164                }
165    
166                msg.append(" COORD_ACTIONS : " + cactions.size());
167    
168                for (CoordinatorActionBean caction : cactions) {
169                    Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
170                            INSTR_CHECK_COORD_ACTIONS_COUNTER, 1);
171                    if (useXCommand) {
172                        queueCallable(new CoordActionCheckXCommand(caction.getId(), actionCheckDelay));
173                    }
174                    else {
175                        queueCallable(new CoordActionCheckCommand(caction.getId(), actionCheckDelay));
176                    }
177                }
178    
179            }
180    
181            /**
182             * Adds callables to a list. If the number of callables in the list
183             * reaches {@link ActionCheckerService#CONF_CALLABLE_BATCH_SIZE}, the
184             * entire batch is queued and the callables list is reset.
185             *
186             * @param callable the callable to queue.
187             */
188            private void queueCallable(XCallable<Void> callable) {
189                if (callables == null) {
190                    callables = new ArrayList<XCallable<Void>>();
191                }
192                callables.add(callable);
193                if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
194                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
195                    if (ret == false) {
196                        XLog.getLog(getClass()).warn(
197                                "Unable to queue the callables commands for CheckerService. "
198                                        + "Most possibly command queue is full. Queue size is :"
199                                        + Services.get().get(CallableQueueService.class).queueSize());
200                    }
201                    callables = new ArrayList<XCallable<Void>>();
202                }
203            }
204        }
205    
206        /**
207         * Initializes the Action Check service.
208         *
209         * @param services services instance.
210         */
211        @Override
212        public void init(Services services) {
213            Configuration conf = services.getConf();
214            Runnable actionCheckRunnable = new ActionCheckRunnable(conf.getInt(CONF_ACTION_CHECK_DELAY, 600));
215            services.get(SchedulerService.class).schedule(actionCheckRunnable, 10,
216                    conf.getInt(CONF_ACTION_CHECK_INTERVAL, 60), SchedulerService.Unit.SEC);
217    
218            if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
219                useXCommand = false;
220            }
221    
222        }
223    
224        /**
225         * Destroy the Action Checker Services.
226         */
227        @Override
228        public void destroy() {
229        }
230    
231        /**
232         * Return the public interface for the action checker service.
233         *
234         * @return {@link ActionCheckerService}.
235         */
236        @Override
237        public Class<? extends Service> getInterface() {
238            return ActionCheckerService.class;
239        }
240    }