001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.util.ArrayList;
021    import java.util.List;
022    
023    import org.apache.hadoop.conf.Configuration;
024    import org.apache.oozie.CoordinatorActionBean;
025    import org.apache.oozie.ErrorCode;
026    import org.apache.oozie.WorkflowActionBean;
027    import org.apache.oozie.command.CommandException;
028    import org.apache.oozie.command.coord.CoordActionCheckXCommand;
029    import org.apache.oozie.command.wf.ActionCheckXCommand;
030    import org.apache.oozie.executor.jpa.CoordActionsRunningGetJPAExecutor;
031    import org.apache.oozie.executor.jpa.JPAExecutorException;
032    import org.apache.oozie.executor.jpa.WorkflowActionsRunningGetJPAExecutor;
033    import org.apache.oozie.util.XCallable;
034    import org.apache.oozie.util.XLog;
035    
036    /**
037     * The Action Checker Service queue ActionCheckCommands to check the status of
038     * running actions and CoordActionCheckCommands to check the status of
039     * coordinator actions. The delay between checks on the same action can be
040     * configured.
041     */
042    public class ActionCheckerService implements Service {
043    
044        public static final String CONF_PREFIX = Service.CONF_PREFIX + "ActionCheckerService.";
045        /**
046         * The frequency at which the ActionCheckService will run.
047         */
048        public static final String CONF_ACTION_CHECK_INTERVAL = CONF_PREFIX + "action.check.interval";
049        /**
050         * The time, in seconds, between an ActionCheck for the same action.
051         */
052        public static final String CONF_ACTION_CHECK_DELAY = CONF_PREFIX + "action.check.delay";
053    
054        /**
055         * The number of callables to be queued in a batch.
056         */
057        public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
058    
059        protected static final String INSTRUMENTATION_GROUP = "actionchecker";
060        protected static final String INSTR_CHECK_ACTIONS_COUNTER = "checks_wf_actions";
061        protected static final String INSTR_CHECK_COORD_ACTIONS_COUNTER = "checks_coord_actions";
062    
063    
064        /**
065         * {@link ActionCheckRunnable} is the runnable which is scheduled to run and
066         * queue Action checks.
067         */
068        static class ActionCheckRunnable implements Runnable {
069            private int actionCheckDelay;
070            private List<XCallable<Void>> callables;
071            private StringBuilder msg = null;
072    
073            public ActionCheckRunnable(int actionCheckDelay) {
074                this.actionCheckDelay = actionCheckDelay;
075            }
076    
077            public void run() {
078                XLog.Info.get().clear();
079                XLog LOG = XLog.getLog(getClass());
080                msg = new StringBuilder();
081                try {
082                    runWFActionCheck();
083                    runCoordActionCheck();
084                }
085                catch (CommandException ce) {
086                    LOG.error("Unable to run action checks, ", ce);
087                }
088    
089                LOG.debug("QUEUING [{0}] for potential checking", msg.toString());
090                if (null != callables) {
091                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
092                    if (ret == false) {
093                        LOG.warn("Unable to queue the callables commands for CheckerService. "
094                                + "Most possibly command queue is full. Queue size is :"
095                                + Services.get().get(CallableQueueService.class).queueSize());
096                    }
097                    callables = null;
098                }
099            }
100    
101            /**
102             * check workflow actions
103             *
104             * @throws CommandException
105             */
106            private void runWFActionCheck() throws CommandException {
107                JPAService jpaService = Services.get().get(JPAService.class);
108                if (jpaService == null) {
109                    throw new CommandException(ErrorCode.E0610);
110                }
111    
112                List<WorkflowActionBean> actions;
113                try {
114                    actions = jpaService
115                            .execute(new WorkflowActionsRunningGetJPAExecutor(actionCheckDelay));
116                }
117                catch (JPAExecutorException je) {
118                    throw new CommandException(je);
119                }
120    
121                if (actions == null || actions.size() == 0) {
122                    return;
123                }
124                msg.append(" WF_ACTIONS : " + actions.size());
125    
126                for (WorkflowActionBean action : actions) {
127                    Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
128                            INSTR_CHECK_ACTIONS_COUNTER, 1);
129                        queueCallable(new ActionCheckXCommand(action.getId()));
130                }
131    
132            }
133    
134            /**
135             * check coordinator actions
136             *
137             * @throws CommandException
138             */
139            private void runCoordActionCheck() throws CommandException {
140                JPAService jpaService = Services.get().get(JPAService.class);
141                if (jpaService == null) {
142                    throw new CommandException(ErrorCode.E0610);
143                }
144    
145                List<String> cactionIds;
146                try {
147                    cactionIds = jpaService.execute(new CoordActionsRunningGetJPAExecutor(
148                            actionCheckDelay));
149                }
150                catch (JPAExecutorException je) {
151                    throw new CommandException(je);
152                }
153    
154                if (cactionIds == null || cactionIds.size() == 0) {
155                    return;
156                }
157    
158                msg.append(" COORD_ACTIONS : " + cactionIds.size());
159                for (String coordActionId : cactionIds) {
160                    Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
161                            INSTR_CHECK_COORD_ACTIONS_COUNTER, 1);
162                        queueCallable(new CoordActionCheckXCommand(coordActionId, actionCheckDelay));
163                }
164    
165            }
166    
167            /**
168             * Adds callables to a list. If the number of callables in the list
169             * reaches {@link ActionCheckerService#CONF_CALLABLE_BATCH_SIZE}, the
170             * entire batch is queued and the callables list is reset.
171             *
172             * @param callable the callable to queue.
173             */
174            private void queueCallable(XCallable<Void> callable) {
175                if (callables == null) {
176                    callables = new ArrayList<XCallable<Void>>();
177                }
178                callables.add(callable);
179                if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
180                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
181                    if (ret == false) {
182                        XLog.getLog(getClass()).warn(
183                                "Unable to queue the callables commands for CheckerService. "
184                                        + "Most possibly command queue is full. Queue size is :"
185                                        + Services.get().get(CallableQueueService.class).queueSize());
186                    }
187                    callables = new ArrayList<XCallable<Void>>();
188                }
189            }
190        }
191    
192        /**
193         * Initializes the Action Check service.
194         *
195         * @param services services instance.
196         */
197        @Override
198        public void init(Services services) {
199            Configuration conf = services.getConf();
200            Runnable actionCheckRunnable = new ActionCheckRunnable(conf.getInt(CONF_ACTION_CHECK_DELAY, 600));
201            services.get(SchedulerService.class).schedule(actionCheckRunnable, 10,
202                    conf.getInt(CONF_ACTION_CHECK_INTERVAL, 60), SchedulerService.Unit.SEC);
203        }
204    
205        /**
206         * Destroy the Action Checker Services.
207         */
208        @Override
209        public void destroy() {
210        }
211    
212        /**
213         * Return the public interface for the action checker service.
214         *
215         * @return {@link ActionCheckerService}.
216         */
217        @Override
218        public Class<? extends Service> getInterface() {
219            return ActionCheckerService.class;
220        }
221    }