001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.sql.Timestamp;
021    import java.util.Date;
022    
023    import org.apache.oozie.WorkflowActionBean;
024    import org.apache.oozie.WorkflowJobBean;
025    import org.apache.oozie.action.ActionExecutor;
026    import org.apache.oozie.action.ActionExecutorException;
027    import org.apache.oozie.client.WorkflowJob;
028    import org.apache.oozie.client.WorkflowAction.Status;
029    import org.apache.oozie.command.CommandException;
030    import org.apache.oozie.service.ActionService;
031    import org.apache.oozie.service.Services;
032    import org.apache.oozie.service.UUIDService;
033    import org.apache.oozie.store.StoreException;
034    import org.apache.oozie.store.WorkflowStore;
035    import org.apache.oozie.util.Instrumentation;
036    import org.apache.oozie.util.XLog;
037    
038    /**
039     * Executes the check command for ActionHandlers. </p> Ensures the action is in RUNNING state before executing {@link
040     * ActionExecutor#check(org.apache.oozie.action.ActionExecutor.Context, org.apache.oozie.client.WorkflowAction)}
041     */
042    public class ActionCheckCommand extends ActionCommand<Void> {
043        public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
044        private String id;
045        private String jobId;
046        private int actionCheckDelay;
047    
048        public ActionCheckCommand(String id) {
049            this(id, -1);
050        }
051    
052        public ActionCheckCommand(String id, int priority, int checkDelay) {
053            super("action.check", "action.check", priority);
054            this.id = id;
055            this.actionCheckDelay = checkDelay;
056        }
057    
058        public ActionCheckCommand(String id, int checkDelay) {
059            this(id, 0, checkDelay);
060        }
061    
062        @Override
063        protected Void call(WorkflowStore store) throws StoreException, CommandException {
064    
065            // String jobId = Services.get().get(UUIDService.class).getId(id);
066            WorkflowJobBean workflow = store.getWorkflow(jobId, false);
067            setLogInfo(workflow);
068            WorkflowActionBean action = store.getAction(id, false);
069            setLogInfo(action);
070            if (action.isPending() && action.getStatus() == WorkflowActionBean.Status.RUNNING) {
071                setLogInfo(workflow);
072                // if the action has been updated, quit this command
073                if (actionCheckDelay > 0) {
074                    Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000);
075                    Timestamp actionLmt = action.getLastCheckTimestamp();
076                    if (actionLmt.after(actionCheckTs)) {
077                        XLog.getLog(getClass()).debug(
078                                "The wf action :" + id + " has been udated recently. Ignoring ActionCheckCommand!");
079                        return null;
080                    }
081                }
082                if (workflow.getStatus() == WorkflowJob.Status.RUNNING) {
083                    ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(action.getType());
084                    if (executor != null) {
085                        ActionExecutorContext context = null;
086                        try {
087                            boolean isRetry = false;
088                            context = new ActionCommand.ActionExecutorContext(workflow, action, isRetry);
089                            incrActionCounter(action.getType(), 1);
090    
091                            Instrumentation.Cron cron = new Instrumentation.Cron();
092                            cron.start();
093                            executor.check(context, action);
094                            cron.stop();
095                            addActionCron(action.getType(), cron);
096    
097                            if (action.isExecutionComplete()) {
098                                if (!context.isExecuted()) {
099                                    XLog.getLog(getClass()).warn(XLog.OPS,
100                                                                 "Action Completed, ActionExecutor [{0}] must call setExecutionData()",
101                                                                 executor.getType());
102                                    action.setErrorInfo(EXEC_DATA_MISSING,
103                                                        "Execution Complete, but Execution Data Missing from Action");
104                                    failJob(context);
105                                    action.setLastCheckTime(new Date());
106                                    store.updateAction(action);
107                                    store.updateWorkflow(workflow);
108                                    return null;
109                                }
110                                action.setPending();
111                                queueCallable(new ActionEndCommand(action.getId(), action.getType()));
112                            }
113                            action.setLastCheckTime(new Date());
114                            store.updateAction(action);
115                            store.updateWorkflow(workflow);
116                        }
117                        catch (ActionExecutorException ex) {
118                            XLog.getLog(getClass()).warn(
119                                    "Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(),
120                                    ex.getMessage(), ex);
121    
122                            switch (ex.getErrorType()) {
123                                case FAILED:
124                                    failAction(workflow, action);
125                                    break;
126                            }
127                            action.setLastCheckTime(new Date());
128                            store.updateAction(action);
129                            store.updateWorkflow(workflow);
130                            return null;
131                        }
132                    }
133                }
134                else {
135                    action.setLastCheckTime(new Date());
136                    store.updateAction(action);
137                    XLog.getLog(getClass()).warn(
138                            "Action [{0}] status is running but WF Job [{1}] status is [{2}]. Expected status is RUNNING.",
139                            action.getId(), workflow.getId(), workflow.getStatus());
140                }
141            }
142            return null;
143        }
144    
145        private void failAction(WorkflowJobBean workflow, WorkflowActionBean action) throws CommandException {
146            XLog.getLog(getClass()).warn("Failing Job [{0}] due to failed action [{1}]", workflow.getId(), action.getId());
147            action.resetPending();
148            action.setStatus(Status.FAILED);
149            workflow.setStatus(WorkflowJob.Status.FAILED);
150            incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1);
151        }
152    
153        /**
154         * @param args
155         * @throws Exception
156         */
157        public static void main(String[] args) throws Exception {
158            new Services().init();
159    
160            try {
161                new ActionCheckCommand("0000001-100122154231282-oozie-dani-W@pig1").call();
162                Thread.sleep(100000);
163            }
164            finally {
165                new Services().destroy();
166            }
167        }
168    
169        @Override
170        protected Void execute(WorkflowStore store) throws CommandException, StoreException {
171            try {
172                XLog.getLog(getClass()).debug("STARTED ActionCheckCommand for wf actionId=" + id + " priority =" + getPriority());
173                jobId = Services.get().get(UUIDService.class).getId(id);
174                if (lock(jobId)) {
175                    call(store);
176                }
177                else {
178                    queueCallable(new ActionCheckCommand(id, actionCheckDelay), LOCK_FAILURE_REQUEUE_INTERVAL);
179                    XLog.getLog(getClass()).warn("ActionCheckCommand lock was not acquired - failed {0}", id);
180                }
181            }
182            catch (InterruptedException e) {
183                queueCallable(new ActionCheckCommand(id, actionCheckDelay), LOCK_FAILURE_REQUEUE_INTERVAL);
184                XLog.getLog(getClass()).warn("ActionCheckCommand lock was not acquired - interrupted exception failed {0}",
185                                             id);
186            }
187            XLog.getLog(getClass()).debug("ENDED ActionCheckCommand for wf actionId=" + id + ", jobId=" + jobId);
188            return null;
189        }
190    }