001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.wf; 020 021import java.io.IOException; 022import java.net.URISyntaxException; 023import java.util.ArrayList; 024import java.util.Date; 025import java.util.List; 026 027import org.apache.oozie.ErrorCode; 028import org.apache.oozie.WorkflowActionBean; 029import org.apache.oozie.WorkflowJobBean; 030import org.apache.oozie.action.control.EndActionExecutor; 031import org.apache.oozie.action.control.ForkActionExecutor; 032import org.apache.oozie.action.control.JoinActionExecutor; 033import org.apache.oozie.action.control.KillActionExecutor; 034import org.apache.oozie.action.control.StartActionExecutor; 035import org.apache.oozie.client.WorkflowJob; 036import org.apache.oozie.command.CommandException; 037import org.apache.oozie.command.PreconditionException; 038import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext; 039import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 040import org.apache.oozie.executor.jpa.BatchQueryExecutor; 041import org.apache.oozie.executor.jpa.JPAExecutorException; 042import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 043import org.apache.oozie.executor.jpa.WorkflowJobGetActionsJPAExecutor; 044import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 045import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 046import org.apache.oozie.service.EventHandlerService; 047import org.apache.oozie.service.HadoopAccessorException; 048import org.apache.oozie.service.JPAService; 049import org.apache.oozie.service.Services; 050import org.apache.oozie.util.InstrumentUtils; 051import org.apache.oozie.util.LogUtils; 052import org.apache.oozie.util.ParamChecker; 053import org.apache.oozie.workflow.WorkflowException; 054import org.apache.oozie.workflow.WorkflowInstance; 055import org.apache.oozie.workflow.lite.LiteWorkflowInstance; 056 057public class ResumeXCommand extends WorkflowXCommand<Void> { 058 059 private String id; 060 private JPAService jpaService = null; 061 private WorkflowJobBean workflow = null; 062 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 063 064 public ResumeXCommand(String id) { 065 super("resume", "resume", 1); 066 this.id = ParamChecker.notEmpty(id, "id"); 067 } 068 069 @Override 070 protected void setLogInfo() { 071 LogUtils.setLogInfo(id); 072 } 073 074 @Override 075 protected Void execute() throws CommandException { 076 try { 077 if (workflow.getStatus() == WorkflowJob.Status.SUSPENDED) { 078 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 079 workflow.getWorkflowInstance().resume(); 080 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 081 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.RUNNING); 082 workflow.setWorkflowInstance(wfInstance); 083 workflow.setStatus(WorkflowJob.Status.RUNNING); 084 085 //for (WorkflowActionBean action : store.getActionsForWorkflow(id, false)) { 086 for (WorkflowActionBean action : jpaService.execute(new WorkflowJobGetActionsJPAExecutor(id))) { 087 088 // Set pending flag to true for the actions that are START_RETRY or 089 // START_MANUAL or END_RETRY or END_MANUAL 090 if (action.isRetryOrManual()) { 091 action.setPendingOnly(); 092 updateList.add(new UpdateEntry<WorkflowActionQuery>( 093 WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action)); 094 } 095 096 if (action.isPending()) { 097 if (action.getStatus() == WorkflowActionBean.Status.PREP 098 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) { 099 // When resuming a workflow that was programatically suspended (via ActionCheckXCommand) because of 100 // a repeated transient error, we have to clean up the action dir 101 if (!action.getType().equals(StartActionExecutor.TYPE) && // The control actions have invalid 102 !action.getType().equals(ForkActionExecutor.TYPE) && // action dir paths because they 103 !action.getType().equals(JoinActionExecutor.TYPE) && // contain ":" (colons) 104 !action.getType().equals(KillActionExecutor.TYPE) && 105 !action.getType().equals(EndActionExecutor.TYPE)) { 106 ActionExecutorContext context = 107 new ActionXCommand.ActionExecutorContext(workflow, action, false, false); 108 if (context.getAppFileSystem().exists(context.getActionDir())) { 109 context.getAppFileSystem().delete(context.getActionDir(), true); 110 } 111 } 112 queue(new ActionStartXCommand(action.getId(), action.getType())); 113 } 114 else { 115 if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) { 116 Date nextRunTime = action.getPendingAge(); 117 queue(new ActionStartXCommand(action.getId(), action.getType()), 118 nextRunTime.getTime() - System.currentTimeMillis()); 119 } 120 else { 121 if (action.getStatus() == WorkflowActionBean.Status.DONE 122 || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) { 123 queue(new ActionEndXCommand(action.getId(), action.getType())); 124 } 125 else { 126 if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) { 127 Date nextRunTime = action.getPendingAge(); 128 queue(new ActionEndXCommand(action.getId(), action.getType()), 129 nextRunTime.getTime() - System.currentTimeMillis()); 130 } 131 } 132 } 133 } 134 135 } 136 } 137 138 workflow.setLastModifiedTime(new Date()); 139 updateList.add(new UpdateEntry<WorkflowJobQuery>( 140 WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, workflow)); 141 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 142 if (EventHandlerService.isEnabled()) { 143 generateEvent(workflow); 144 } 145 queue(new WorkflowNotificationXCommand(workflow)); 146 } 147 return null; 148 } 149 catch (WorkflowException ex) { 150 throw new CommandException(ex); 151 } 152 catch (JPAExecutorException e) { 153 throw new CommandException(e); 154 } 155 catch (HadoopAccessorException e) { 156 throw new CommandException(e); 157 } 158 catch (IOException e) { 159 throw new CommandException(ErrorCode.E0902, e.getMessage(), e); 160 } 161 catch (URISyntaxException e) { 162 throw new CommandException(ErrorCode.E0902, e.getMessage(), e); 163 } 164 finally { 165 updateParentIfNecessary(workflow); 166 } 167 } 168 169 @Override 170 public String getEntityKey() { 171 return id; 172 } 173 174 @Override 175 public String getKey() { 176 return getName() + "_" + id; 177 } 178 179 @Override 180 protected boolean isLockRequired() { 181 return true; 182 } 183 184 @Override 185 protected void loadState() throws CommandException { 186 jpaService = Services.get().get(JPAService.class); 187 if (jpaService == null) { 188 throw new CommandException(ErrorCode.E0610); 189 } 190 try { 191 workflow = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_RESUME, id); 192 } 193 catch (JPAExecutorException e) { 194 throw new CommandException(e); 195 } 196 LogUtils.setLogInfo(workflow); 197 } 198 199 @Override 200 protected void verifyPrecondition() throws CommandException, PreconditionException { 201 if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED) { 202 throw new PreconditionException(ErrorCode.E1100, "workflow's status is " + workflow.getStatusStr() 203 + " is not SUSPENDED"); 204 } 205 } 206}