001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.io.IOException;
021    import java.net.URISyntaxException;
022    import java.util.ArrayList;
023    import java.util.Date;
024    import java.util.List;
025    
026    import org.apache.oozie.ErrorCode;
027    import org.apache.oozie.WorkflowActionBean;
028    import org.apache.oozie.WorkflowJobBean;
029    import org.apache.oozie.action.control.EndActionExecutor;
030    import org.apache.oozie.action.control.ForkActionExecutor;
031    import org.apache.oozie.action.control.JoinActionExecutor;
032    import org.apache.oozie.action.control.KillActionExecutor;
033    import org.apache.oozie.action.control.StartActionExecutor;
034    import org.apache.oozie.client.WorkflowJob;
035    import org.apache.oozie.client.rest.JsonBean;
036    import org.apache.oozie.command.CommandException;
037    import org.apache.oozie.command.PreconditionException;
038    import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
039    import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
040    import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
041    import org.apache.oozie.executor.jpa.JPAExecutorException;
042    import org.apache.oozie.executor.jpa.WorkflowJobGetActionsJPAExecutor;
043    import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
044    import org.apache.oozie.service.EventHandlerService;
045    import org.apache.oozie.service.HadoopAccessorException;
046    import org.apache.oozie.service.JPAService;
047    import org.apache.oozie.service.Services;
048    import org.apache.oozie.util.InstrumentUtils;
049    import org.apache.oozie.util.LogUtils;
050    import org.apache.oozie.util.ParamChecker;
051    import org.apache.oozie.workflow.WorkflowException;
052    import org.apache.oozie.workflow.WorkflowInstance;
053    import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
054    
055    public class ResumeXCommand extends WorkflowXCommand<Void> {
056    
057        private String id;
058        private JPAService jpaService = null;
059        private WorkflowJobBean workflow = null;
060        private List<JsonBean> updateList = new ArrayList<JsonBean>();
061    
062        public ResumeXCommand(String id) {
063            super("resume", "resume", 1);
064            this.id = ParamChecker.notEmpty(id, "id");
065        }
066    
067        @Override
068        protected Void execute() throws CommandException {
069            try {
070                if (workflow.getStatus() == WorkflowJob.Status.SUSPENDED) {
071                    InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
072                    workflow.getWorkflowInstance().resume();
073                    WorkflowInstance wfInstance = workflow.getWorkflowInstance();
074                    ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.RUNNING);
075                    workflow.setWorkflowInstance(wfInstance);
076                    workflow.setStatus(WorkflowJob.Status.RUNNING);
077    
078                    //for (WorkflowActionBean action : store.getActionsForWorkflow(id, false)) {
079                    for (WorkflowActionBean action : jpaService.execute(new WorkflowJobGetActionsJPAExecutor(id))) {
080    
081                        // Set pending flag to true for the actions that are START_RETRY or
082                        // START_MANUAL or END_RETRY or END_MANUAL
083                        if (action.isRetryOrManual()) {
084                            action.setPendingOnly();
085                            updateList.add(action);
086                        }
087    
088                        if (action.isPending()) {
089                            if (action.getStatus() == WorkflowActionBean.Status.PREP
090                                    || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
091                                // When resuming a workflow that was programatically suspended (via ActionCheckXCommand) because of
092                                // a repeated transient error, we have to clean up the action dir
093                                if (!action.getType().equals(StartActionExecutor.TYPE) &&       // The control actions have invalid
094                                    !action.getType().equals(ForkActionExecutor.TYPE) &&        // action dir paths because they
095                                    !action.getType().equals(JoinActionExecutor.TYPE) &&        // contain ":" (colons)
096                                    !action.getType().equals(KillActionExecutor.TYPE) &&
097                                    !action.getType().equals(EndActionExecutor.TYPE)) {
098                                    ActionExecutorContext context =
099                                            new ActionXCommand.ActionExecutorContext(workflow, action, false, false);
100                                    if (context.getAppFileSystem().exists(context.getActionDir())) {
101                                        context.getAppFileSystem().delete(context.getActionDir(), true);
102                                    }
103                                }
104                                queue(new ActionStartXCommand(action.getId(), action.getType()));
105                            }
106                            else {
107                                if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) {
108                                    Date nextRunTime = action.getPendingAge();
109                                    queue(new ActionStartXCommand(action.getId(), action.getType()),
110                                                  nextRunTime.getTime() - System.currentTimeMillis());
111                                }
112                                else {
113                                    if (action.getStatus() == WorkflowActionBean.Status.DONE
114                                            || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
115                                        queue(new ActionEndXCommand(action.getId(), action.getType()));
116                                    }
117                                    else {
118                                        if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) {
119                                            Date nextRunTime = action.getPendingAge();
120                                            queue(new ActionEndXCommand(action.getId(), action.getType()),
121                                                          nextRunTime.getTime() - System.currentTimeMillis());
122                                        }
123                                    }
124                                }
125                            }
126    
127                        }
128                    }
129    
130                    workflow.setLastModifiedTime(new Date());
131                    updateList.add(workflow);
132                    jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
133                    if (EventHandlerService.isEnabled()) {
134                        generateEvent(workflow);
135                    }
136                    queue(new NotificationXCommand(workflow));
137                }
138                return null;
139            }
140            catch (WorkflowException ex) {
141                throw new CommandException(ex);
142            }
143            catch (JPAExecutorException e) {
144                throw new CommandException(e);
145            }
146            catch (HadoopAccessorException e) {
147                throw new CommandException(e);
148            }
149            catch (IOException e) {
150                throw new CommandException(ErrorCode.E0902, e.getMessage(), e);
151            }
152            catch (URISyntaxException e) {
153                throw new CommandException(ErrorCode.E0902, e.getMessage(), e);
154            }
155            finally {
156                // update coordinator action
157                new CoordActionUpdateXCommand(workflow).call();
158            }
159        }
160    
161        @Override
162        public String getEntityKey() {
163            return id;
164        }
165    
166        @Override
167        public String getKey() {
168            return getName() + "_" + id;
169        }
170    
171        @Override
172        protected boolean isLockRequired() {
173            return true;
174        }
175    
176        @Override
177        protected void loadState() throws CommandException {
178            jpaService = Services.get().get(JPAService.class);
179            if (jpaService == null) {
180                throw new CommandException(ErrorCode.E0610);
181            }
182            try {
183                workflow = jpaService.execute(new WorkflowJobGetJPAExecutor(id));
184            }
185            catch (JPAExecutorException e) {
186                throw new CommandException(e);
187            }
188            LogUtils.setLogInfo(workflow, logInfo);
189        }
190    
191        @Override
192        protected void verifyPrecondition() throws CommandException, PreconditionException {
193            if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED) {
194                throw new PreconditionException(ErrorCode.E1100, "workflow's status is " + workflow.getStatusStr() + " is not SUSPENDED");
195            }
196        }
197    }