001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.io.IOException; 021 import java.net.URISyntaxException; 022 import java.util.ArrayList; 023 import java.util.Date; 024 import java.util.List; 025 026 import org.apache.oozie.ErrorCode; 027 import org.apache.oozie.WorkflowActionBean; 028 import org.apache.oozie.WorkflowJobBean; 029 import org.apache.oozie.action.control.EndActionExecutor; 030 import org.apache.oozie.action.control.ForkActionExecutor; 031 import org.apache.oozie.action.control.JoinActionExecutor; 032 import org.apache.oozie.action.control.KillActionExecutor; 033 import org.apache.oozie.action.control.StartActionExecutor; 034 import org.apache.oozie.client.WorkflowJob; 035 import org.apache.oozie.client.rest.JsonBean; 036 import org.apache.oozie.command.CommandException; 037 import org.apache.oozie.command.PreconditionException; 038 import org.apache.oozie.command.coord.CoordActionUpdateXCommand; 039 import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext; 040 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 041 import org.apache.oozie.executor.jpa.JPAExecutorException; 042 import org.apache.oozie.executor.jpa.WorkflowJobGetActionsJPAExecutor; 043 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 044 import org.apache.oozie.service.HadoopAccessorException; 045 import org.apache.oozie.service.JPAService; 046 import org.apache.oozie.service.Services; 047 import org.apache.oozie.util.InstrumentUtils; 048 import org.apache.oozie.util.LogUtils; 049 import org.apache.oozie.util.ParamChecker; 050 import org.apache.oozie.workflow.WorkflowException; 051 import org.apache.oozie.workflow.WorkflowInstance; 052 import org.apache.oozie.workflow.lite.LiteWorkflowInstance; 053 054 public class ResumeXCommand extends WorkflowXCommand<Void> { 055 056 private String id; 057 private JPAService jpaService = null; 058 private WorkflowJobBean workflow = null; 059 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 060 061 public ResumeXCommand(String id) { 062 super("resume", "resume", 1); 063 this.id = ParamChecker.notEmpty(id, "id"); 064 } 065 066 @Override 067 protected Void execute() throws CommandException { 068 try { 069 if (workflow.getStatus() == WorkflowJob.Status.SUSPENDED) { 070 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 071 workflow.getWorkflowInstance().resume(); 072 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 073 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.RUNNING); 074 workflow.setWorkflowInstance(wfInstance); 075 workflow.setStatus(WorkflowJob.Status.RUNNING); 076 077 078 //for (WorkflowActionBean action : store.getActionsForWorkflow(id, false)) { 079 for (WorkflowActionBean action : jpaService.execute(new WorkflowJobGetActionsJPAExecutor(id))) { 080 081 // Set pending flag to true for the actions that are START_RETRY or 082 // START_MANUAL or END_RETRY or END_MANUAL 083 if (action.isRetryOrManual()) { 084 action.setPendingOnly(); 085 updateList.add(action); 086 } 087 088 if (action.isPending()) { 089 if (action.getStatus() == WorkflowActionBean.Status.PREP 090 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) { 091 // When resuming a workflow that was programatically suspended (via ActionCheckXCommand) because of 092 // a repeated transient error, we have to clean up the action dir 093 if (!action.getType().equals(StartActionExecutor.TYPE) && // The control actions have invalid 094 !action.getType().equals(ForkActionExecutor.TYPE) && // action dir paths because they 095 !action.getType().equals(JoinActionExecutor.TYPE) && // contain ":" (colons) 096 !action.getType().equals(KillActionExecutor.TYPE) && 097 !action.getType().equals(EndActionExecutor.TYPE)) { 098 ActionExecutorContext context = 099 new ActionXCommand.ActionExecutorContext(workflow, action, false, false); 100 if (context.getAppFileSystem().exists(context.getActionDir())) { 101 context.getAppFileSystem().delete(context.getActionDir(), true); 102 } 103 } 104 queue(new ActionStartXCommand(action.getId(), action.getType())); 105 } 106 else { 107 if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) { 108 Date nextRunTime = action.getPendingAge(); 109 queue(new ActionStartXCommand(action.getId(), action.getType()), 110 nextRunTime.getTime() - System.currentTimeMillis()); 111 } 112 else { 113 if (action.getStatus() == WorkflowActionBean.Status.DONE 114 || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) { 115 queue(new ActionEndXCommand(action.getId(), action.getType())); 116 } 117 else { 118 if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) { 119 Date nextRunTime = action.getPendingAge(); 120 queue(new ActionEndXCommand(action.getId(), action.getType()), 121 nextRunTime.getTime() - System.currentTimeMillis()); 122 } 123 } 124 } 125 } 126 127 } 128 } 129 130 workflow.setLastModifiedTime(new Date()); 131 updateList.add(workflow); 132 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null)); 133 queue(new NotificationXCommand(workflow)); 134 } 135 return null; 136 } 137 catch (WorkflowException ex) { 138 throw new CommandException(ex); 139 } 140 catch (JPAExecutorException e) { 141 throw new CommandException(e); 142 } 143 catch (HadoopAccessorException e) { 144 throw new CommandException(e); 145 } 146 catch (IOException e) { 147 throw new CommandException(ErrorCode.E0902, e.getMessage(), e); 148 } 149 catch (URISyntaxException e) { 150 throw new CommandException(ErrorCode.E0902, e.getMessage(), e); 151 } 152 finally { 153 // update coordinator action 154 new CoordActionUpdateXCommand(workflow).call(); 155 } 156 } 157 158 @Override 159 public String getEntityKey() { 160 return id; 161 } 162 163 @Override 164 protected boolean isLockRequired() { 165 return true; 166 } 167 168 @Override 169 protected void loadState() throws CommandException { 170 jpaService = Services.get().get(JPAService.class); 171 if (jpaService == null) { 172 throw new CommandException(ErrorCode.E0610); 173 } 174 try { 175 workflow = jpaService.execute(new WorkflowJobGetJPAExecutor(id)); 176 } 177 catch (JPAExecutorException e) { 178 throw new CommandException(e); 179 } 180 LogUtils.setLogInfo(workflow, logInfo); 181 } 182 183 @Override 184 protected void verifyPrecondition() throws CommandException, PreconditionException { 185 if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED) { 186 throw new PreconditionException(ErrorCode.E1100, "workflow's status is " + workflow.getStatusStr() + " is not SUSPENDED"); 187 } 188 } 189 }