001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.util.ArrayList;
021    import java.util.List;
022    
023    import org.apache.hadoop.conf.Configuration;
024    import org.apache.oozie.CoordinatorActionBean;
025    import org.apache.oozie.ErrorCode;
026    import org.apache.oozie.WorkflowActionBean;
027    import org.apache.oozie.command.CommandException;
028    import org.apache.oozie.command.coord.CoordActionCheckXCommand;
029    import org.apache.oozie.command.wf.ActionCheckXCommand;
030    import org.apache.oozie.executor.jpa.CoordActionsRunningGetJPAExecutor;
031    import org.apache.oozie.executor.jpa.JPAExecutorException;
032    import org.apache.oozie.executor.jpa.WorkflowActionsRunningGetJPAExecutor;
033    import org.apache.oozie.util.XCallable;
034    import org.apache.oozie.util.XLog;
035    
036    /**
037     * The Action Checker Service queue ActionCheckCommands to check the status of
038     * running actions and CoordActionCheckCommands to check the status of
039     * coordinator actions. The delay between checks on the same action can be
040     * configured.
041     */
042    public class ActionCheckerService implements Service {
043    
044        public static final String CONF_PREFIX = Service.CONF_PREFIX + "ActionCheckerService.";
045        /**
046         * The frequency at which the ActionCheckService will run.
047         */
048        public static final String CONF_ACTION_CHECK_INTERVAL = CONF_PREFIX + "action.check.interval";
049        /**
050         * The time, in seconds, between an ActionCheck for the same action.
051         */
052        public static final String CONF_ACTION_CHECK_DELAY = CONF_PREFIX + "action.check.delay";
053    
054        /**
055         * The number of callables to be queued in a batch.
056         */
057        public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
058    
059        protected static final String INSTRUMENTATION_GROUP = "actionchecker";
060        protected static final String INSTR_CHECK_ACTIONS_COUNTER = "checks_wf_actions";
061        protected static final String INSTR_CHECK_COORD_ACTIONS_COUNTER = "checks_coord_actions";
062    
063    
064        /**
065         * {@link ActionCheckRunnable} is the runnable which is scheduled to run and
066         * queue Action checks.
067         */
068        static class ActionCheckRunnable implements Runnable {
069            private int actionCheckDelay;
070            private List<XCallable<Void>> callables;
071            private StringBuilder msg = null;
072    
073            public ActionCheckRunnable(int actionCheckDelay) {
074                this.actionCheckDelay = actionCheckDelay;
075            }
076    
077            public void run() {
078                XLog.Info.get().clear();
079                XLog LOG = XLog.getLog(getClass());
080                msg = new StringBuilder();
081                try {
082                    runWFActionCheck();
083                    runCoordActionCheck();
084                }
085                catch (CommandException ce) {
086                    LOG.error("Unable to run action checks, ", ce);
087                }
088    
089                LOG.debug("QUEUING [{0}] for potential checking", msg.toString());
090                if (null != callables) {
091                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
092                    if (ret == false) {
093                        LOG.warn("Unable to queue the callables commands for CheckerService. "
094                                + "Most possibly command queue is full. Queue size is :"
095                                + Services.get().get(CallableQueueService.class).queueSize());
096                    }
097                    callables = null;
098                }
099            }
100    
101            /**
102             * check workflow actions
103             *
104             * @throws CommandException
105             */
106            private void runWFActionCheck() throws CommandException {
107                JPAService jpaService = Services.get().get(JPAService.class);
108                if (jpaService == null) {
109                    throw new CommandException(ErrorCode.E0610);
110                }
111    
112                List<WorkflowActionBean> actions;
113                try {
114                    actions = jpaService
115                            .execute(new WorkflowActionsRunningGetJPAExecutor(actionCheckDelay));
116                }
117                catch (JPAExecutorException je) {
118                    throw new CommandException(je);
119                }
120    
121                if (actions == null || actions.size() == 0) {
122                    return;
123                }
124                msg.append(" WF_ACTIONS : " + actions.size());
125    
126                for (WorkflowActionBean action : actions) {
127                    Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
128                            INSTR_CHECK_ACTIONS_COUNTER, 1);
129                        queueCallable(new ActionCheckXCommand(action.getId()));
130                }
131    
132            }
133    
134            /**
135             * check coordinator actions
136             *
137             * @throws CommandException
138             */
139            private void runCoordActionCheck() throws CommandException {
140                JPAService jpaService = Services.get().get(JPAService.class);
141                if (jpaService == null) {
142                    throw new CommandException(ErrorCode.E0610);
143                }
144    
145                List<String> cactionIds;
146                try {
147                    cactionIds = jpaService.execute(new CoordActionsRunningGetJPAExecutor(
148                            actionCheckDelay));
149                }
150                catch (JPAExecutorException je) {
151                    throw new CommandException(je);
152                }
153    
154                if (cactionIds == null || cactionIds.size() == 0) {
155                    return;
156                }
157    
158                msg.append(" COORD_ACTIONS : " + cactionIds.size());
159                System.out.println("MyCoordactions "+cactionIds.size());
160                for (String coordActionId : cactionIds) {
161                    Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
162                            INSTR_CHECK_COORD_ACTIONS_COUNTER, 1);
163                        queueCallable(new CoordActionCheckXCommand(coordActionId, actionCheckDelay));
164                }
165    
166            }
167    
168            /**
169             * Adds callables to a list. If the number of callables in the list
170             * reaches {@link ActionCheckerService#CONF_CALLABLE_BATCH_SIZE}, the
171             * entire batch is queued and the callables list is reset.
172             *
173             * @param callable the callable to queue.
174             */
175            private void queueCallable(XCallable<Void> callable) {
176                if (callables == null) {
177                    callables = new ArrayList<XCallable<Void>>();
178                }
179                callables.add(callable);
180                if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
181                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
182                    if (ret == false) {
183                        XLog.getLog(getClass()).warn(
184                                "Unable to queue the callables commands for CheckerService. "
185                                        + "Most possibly command queue is full. Queue size is :"
186                                        + Services.get().get(CallableQueueService.class).queueSize());
187                    }
188                    callables = new ArrayList<XCallable<Void>>();
189                }
190            }
191        }
192    
193        /**
194         * Initializes the Action Check service.
195         *
196         * @param services services instance.
197         */
198        @Override
199        public void init(Services services) {
200            Configuration conf = services.getConf();
201            Runnable actionCheckRunnable = new ActionCheckRunnable(conf.getInt(CONF_ACTION_CHECK_DELAY, 600));
202            services.get(SchedulerService.class).schedule(actionCheckRunnable, 10,
203                    conf.getInt(CONF_ACTION_CHECK_INTERVAL, 60), SchedulerService.Unit.SEC);
204        }
205    
206        /**
207         * Destroy the Action Checker Services.
208         */
209        @Override
210        public void destroy() {
211        }
212    
213        /**
214         * Return the public interface for the action checker service.
215         *
216         * @return {@link ActionCheckerService}.
217         */
218        @Override
219        public Class<? extends Service> getInterface() {
220            return ActionCheckerService.class;
221        }
222    }