001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.util.Date; 021 022 import org.apache.oozie.ErrorCode; 023 import org.apache.oozie.WorkflowActionBean; 024 import org.apache.oozie.WorkflowJobBean; 025 import org.apache.oozie.client.WorkflowJob; 026 import org.apache.oozie.command.CommandException; 027 import org.apache.oozie.command.PreconditionException; 028 import org.apache.oozie.command.coord.CoordActionUpdateXCommand; 029 import org.apache.oozie.executor.jpa.JPAExecutorException; 030 import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor; 031 import org.apache.oozie.executor.jpa.WorkflowJobGetActionsJPAExecutor; 032 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 033 import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor; 034 import org.apache.oozie.service.JPAService; 035 import org.apache.oozie.service.Services; 036 import org.apache.oozie.util.InstrumentUtils; 037 import org.apache.oozie.util.LogUtils; 038 import org.apache.oozie.util.ParamChecker; 039 import org.apache.oozie.workflow.WorkflowException; 040 import org.apache.oozie.workflow.WorkflowInstance; 041 import org.apache.oozie.workflow.lite.LiteWorkflowInstance; 042 043 public class ResumeXCommand extends WorkflowXCommand<Void> { 044 045 private String id; 046 private JPAService jpaService = null; 047 private WorkflowJobBean workflow = null; 048 049 public ResumeXCommand(String id) { 050 super("resume", "resume", 1); 051 this.id = ParamChecker.notEmpty(id, "id"); 052 } 053 054 @Override 055 protected Void execute() throws CommandException { 056 try { 057 if (workflow.getStatus() == WorkflowJob.Status.SUSPENDED) { 058 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 059 workflow.getWorkflowInstance().resume(); 060 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 061 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.RUNNING); 062 workflow.setWorkflowInstance(wfInstance); 063 workflow.setStatus(WorkflowJob.Status.RUNNING); 064 065 066 //for (WorkflowActionBean action : store.getActionsForWorkflow(id, false)) { 067 for (WorkflowActionBean action : jpaService.execute(new WorkflowJobGetActionsJPAExecutor(id))) { 068 069 // Set pending flag to true for the actions that are START_RETRY or 070 // START_MANUAL or END_RETRY or END_MANUAL 071 if (action.isRetryOrManual()) { 072 action.setPendingOnly(); 073 jpaService.execute(new WorkflowActionUpdateJPAExecutor(action)); 074 } 075 076 if (action.isPending()) { 077 if (action.getStatus() == WorkflowActionBean.Status.PREP 078 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) { 079 queue(new ActionStartXCommand(action.getId(), action.getType())); 080 } 081 else { 082 if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) { 083 Date nextRunTime = action.getPendingAge(); 084 queue(new ActionStartXCommand(action.getId(), action.getType()), 085 nextRunTime.getTime() - System.currentTimeMillis()); 086 } 087 else { 088 if (action.getStatus() == WorkflowActionBean.Status.DONE 089 || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) { 090 queue(new ActionEndXCommand(action.getId(), action.getType())); 091 } 092 else { 093 if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) { 094 Date nextRunTime = action.getPendingAge(); 095 queue(new ActionEndXCommand(action.getId(), action.getType()), 096 nextRunTime.getTime() - System.currentTimeMillis()); 097 } 098 } 099 } 100 } 101 102 } 103 } 104 105 jpaService.execute(new WorkflowJobUpdateJPAExecutor(workflow)); 106 queue(new NotificationXCommand(workflow)); 107 } 108 return null; 109 } 110 catch (WorkflowException ex) { 111 throw new CommandException(ex); 112 } 113 catch (JPAExecutorException e) { 114 throw new CommandException(e); 115 } 116 finally { 117 // update coordinator action 118 new CoordActionUpdateXCommand(workflow).call(); 119 } 120 } 121 122 @Override 123 protected String getEntityKey() { 124 return id; 125 } 126 127 @Override 128 protected boolean isLockRequired() { 129 return true; 130 } 131 132 @Override 133 protected void loadState() throws CommandException { 134 jpaService = Services.get().get(JPAService.class); 135 if (jpaService == null) { 136 throw new CommandException(ErrorCode.E0610); 137 } 138 try { 139 workflow = jpaService.execute(new WorkflowJobGetJPAExecutor(id)); 140 } 141 catch (JPAExecutorException e) { 142 throw new CommandException(e); 143 } 144 LogUtils.setLogInfo(workflow, logInfo); 145 } 146 147 @Override 148 protected void verifyPrecondition() throws CommandException, PreconditionException { 149 if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED) { 150 throw new PreconditionException(ErrorCode.E1100, "workflow's status is " + workflow.getStatusStr() + " is not SUSPENDED"); 151 } 152 } 153 }