001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.wf; 019 020import java.io.IOException; 021import java.net.URISyntaxException; 022import java.util.ArrayList; 023import java.util.Date; 024import java.util.List; 025 026import org.apache.oozie.ErrorCode; 027import org.apache.oozie.WorkflowActionBean; 028import org.apache.oozie.WorkflowJobBean; 029import org.apache.oozie.action.control.EndActionExecutor; 030import org.apache.oozie.action.control.ForkActionExecutor; 031import org.apache.oozie.action.control.JoinActionExecutor; 032import org.apache.oozie.action.control.KillActionExecutor; 033import org.apache.oozie.action.control.StartActionExecutor; 034import org.apache.oozie.client.WorkflowJob; 035import org.apache.oozie.command.CommandException; 036import org.apache.oozie.command.PreconditionException; 037import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext; 038import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 039import org.apache.oozie.executor.jpa.BatchQueryExecutor; 040import org.apache.oozie.executor.jpa.JPAExecutorException; 041import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 042import org.apache.oozie.executor.jpa.WorkflowJobGetActionsJPAExecutor; 043import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 044import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 045import org.apache.oozie.service.EventHandlerService; 046import org.apache.oozie.service.HadoopAccessorException; 047import org.apache.oozie.service.JPAService; 048import org.apache.oozie.service.Services; 049import org.apache.oozie.util.InstrumentUtils; 050import org.apache.oozie.util.LogUtils; 051import org.apache.oozie.util.ParamChecker; 052import org.apache.oozie.workflow.WorkflowException; 053import org.apache.oozie.workflow.WorkflowInstance; 054import org.apache.oozie.workflow.lite.LiteWorkflowInstance; 055 056public class ResumeXCommand extends WorkflowXCommand<Void> { 057 058 private String id; 059 private JPAService jpaService = null; 060 private WorkflowJobBean workflow = null; 061 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 062 063 public ResumeXCommand(String id) { 064 super("resume", "resume", 1); 065 this.id = ParamChecker.notEmpty(id, "id"); 066 } 067 068 @Override 069 protected Void execute() throws CommandException { 070 try { 071 if (workflow.getStatus() == WorkflowJob.Status.SUSPENDED) { 072 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 073 workflow.getWorkflowInstance().resume(); 074 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 075 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.RUNNING); 076 workflow.setWorkflowInstance(wfInstance); 077 workflow.setStatus(WorkflowJob.Status.RUNNING); 078 079 //for (WorkflowActionBean action : store.getActionsForWorkflow(id, false)) { 080 for (WorkflowActionBean action : jpaService.execute(new WorkflowJobGetActionsJPAExecutor(id))) { 081 082 // Set pending flag to true for the actions that are START_RETRY or 083 // START_MANUAL or END_RETRY or END_MANUAL 084 if (action.isRetryOrManual()) { 085 action.setPendingOnly(); 086 updateList.add(new UpdateEntry<WorkflowActionQuery>( 087 WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action)); 088 } 089 090 if (action.isPending()) { 091 if (action.getStatus() == WorkflowActionBean.Status.PREP 092 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) { 093 // When resuming a workflow that was programatically suspended (via ActionCheckXCommand) because of 094 // a repeated transient error, we have to clean up the action dir 095 if (!action.getType().equals(StartActionExecutor.TYPE) && // The control actions have invalid 096 !action.getType().equals(ForkActionExecutor.TYPE) && // action dir paths because they 097 !action.getType().equals(JoinActionExecutor.TYPE) && // contain ":" (colons) 098 !action.getType().equals(KillActionExecutor.TYPE) && 099 !action.getType().equals(EndActionExecutor.TYPE)) { 100 ActionExecutorContext context = 101 new ActionXCommand.ActionExecutorContext(workflow, action, false, false); 102 if (context.getAppFileSystem().exists(context.getActionDir())) { 103 context.getAppFileSystem().delete(context.getActionDir(), true); 104 } 105 } 106 queue(new ActionStartXCommand(action.getId(), action.getType())); 107 } 108 else { 109 if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) { 110 Date nextRunTime = action.getPendingAge(); 111 queue(new ActionStartXCommand(action.getId(), action.getType()), 112 nextRunTime.getTime() - System.currentTimeMillis()); 113 } 114 else { 115 if (action.getStatus() == WorkflowActionBean.Status.DONE 116 || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) { 117 queue(new ActionEndXCommand(action.getId(), action.getType())); 118 } 119 else { 120 if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) { 121 Date nextRunTime = action.getPendingAge(); 122 queue(new ActionEndXCommand(action.getId(), action.getType()), 123 nextRunTime.getTime() - System.currentTimeMillis()); 124 } 125 } 126 } 127 } 128 129 } 130 } 131 132 workflow.setLastModifiedTime(new Date()); 133 updateList.add(new UpdateEntry<WorkflowJobQuery>( 134 WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, workflow)); 135 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 136 if (EventHandlerService.isEnabled()) { 137 generateEvent(workflow); 138 } 139 queue(new NotificationXCommand(workflow)); 140 } 141 return null; 142 } 143 catch (WorkflowException ex) { 144 throw new CommandException(ex); 145 } 146 catch (JPAExecutorException e) { 147 throw new CommandException(e); 148 } 149 catch (HadoopAccessorException e) { 150 throw new CommandException(e); 151 } 152 catch (IOException e) { 153 throw new CommandException(ErrorCode.E0902, e.getMessage(), e); 154 } 155 catch (URISyntaxException e) { 156 throw new CommandException(ErrorCode.E0902, e.getMessage(), e); 157 } 158 finally { 159 updateParentIfNecessary(workflow); 160 } 161 } 162 163 @Override 164 public String getEntityKey() { 165 return id; 166 } 167 168 @Override 169 public String getKey() { 170 return getName() + "_" + id; 171 } 172 173 @Override 174 protected boolean isLockRequired() { 175 return true; 176 } 177 178 @Override 179 protected void loadState() throws CommandException { 180 jpaService = Services.get().get(JPAService.class); 181 if (jpaService == null) { 182 throw new CommandException(ErrorCode.E0610); 183 } 184 try { 185 workflow = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_RESUME, id); 186 } 187 catch (JPAExecutorException e) { 188 throw new CommandException(e); 189 } 190 LogUtils.setLogInfo(workflow, logInfo); 191 } 192 193 @Override 194 protected void verifyPrecondition() throws CommandException, PreconditionException { 195 if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED) { 196 throw new PreconditionException(ErrorCode.E1100, "workflow's status is " + workflow.getStatusStr() 197 + " is not SUSPENDED"); 198 } 199 } 200}