001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.util.Date; 021 import java.util.List; 022 023 import org.apache.oozie.CoordinatorActionBean; 024 import org.apache.oozie.CoordinatorJobBean; 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.XException; 027 import org.apache.oozie.client.CoordinatorJob; 028 import org.apache.oozie.client.Job; 029 import org.apache.oozie.command.CommandException; 030 import org.apache.oozie.command.PreconditionException; 031 import org.apache.oozie.command.ResumeTransitionXCommand; 032 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 033 import org.apache.oozie.command.wf.ResumeXCommand; 034 import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor; 035 import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor; 036 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 037 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor; 038 import org.apache.oozie.executor.jpa.JPAExecutorException; 039 import org.apache.oozie.service.JPAService; 040 import org.apache.oozie.service.Services; 041 import org.apache.oozie.util.InstrumentUtils; 042 import org.apache.oozie.util.LogUtils; 043 import org.apache.oozie.util.ParamChecker; 044 045 /** 046 * Resume coordinator job and actions. 047 * 048 */ 049 public class CoordResumeXCommand extends ResumeTransitionXCommand { 050 private final String jobId; 051 private CoordinatorJobBean coordJob = null; 052 private JPAService jpaService = null; 053 private boolean exceptionOccured = false; 054 CoordinatorJob.Status prevStatus; 055 056 public CoordResumeXCommand(String id) { 057 super("coord_resume", "coord_resume", 1); 058 this.jobId = ParamChecker.notEmpty(id, "id"); 059 } 060 061 /* (non-Javadoc) 062 * @see org.apache.oozie.command.XCommand#getEntityKey() 063 */ 064 @Override 065 protected String getEntityKey() { 066 return jobId; 067 } 068 069 /* (non-Javadoc) 070 * @see org.apache.oozie.command.XCommand#isLockRequired() 071 */ 072 @Override 073 protected boolean isLockRequired() { 074 return true; 075 } 076 077 /* (non-Javadoc) 078 * @see org.apache.oozie.command.XCommand#loadState() 079 */ 080 @Override 081 protected void loadState() throws CommandException { 082 jpaService = Services.get().get(JPAService.class); 083 if (jpaService == null) { 084 throw new CommandException(ErrorCode.E0610); 085 } 086 try { 087 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 088 } 089 catch (JPAExecutorException e) { 090 throw new CommandException(e); 091 } 092 setJob(coordJob); 093 prevStatus = coordJob.getStatus(); 094 LogUtils.setLogInfo(coordJob, logInfo); 095 } 096 097 /* (non-Javadoc) 098 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 099 */ 100 @Override 101 protected void verifyPrecondition() throws CommandException, PreconditionException { 102 if (coordJob.getStatus() != CoordinatorJob.Status.SUSPENDED && coordJob.getStatus() != Job.Status.PREPSUSPENDED) { 103 throw new PreconditionException(ErrorCode.E1100, "CoordResumeXCommand not Resumed - " 104 + "job not in SUSPENDED/PREPSUSPENDED state, job = " + jobId); 105 } 106 } 107 108 /* (non-Javadoc) 109 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 110 */ 111 @Override 112 public void updateJob() throws CommandException { 113 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 114 coordJob.setSuspendedTime(null); 115 coordJob.setLastModifiedTime(new Date()); 116 LOG.debug("Resume coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = " + coordJob.isPending()); 117 try { 118 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 119 } 120 catch (JPAExecutorException e) { 121 throw new CommandException(e); 122 } 123 } 124 125 /* (non-Javadoc) 126 * @see org.apache.oozie.command.ResumeTransitionXCommand#resumeChildren() 127 */ 128 @Override 129 public void resumeChildren() throws CommandException { 130 try { 131 List<CoordinatorActionBean> actionList = jpaService.execute(new CoordJobGetActionsJPAExecutor(jobId)); 132 133 for (CoordinatorActionBean action : actionList) { 134 if(action.getStatus() == CoordinatorActionBean.Status.SUSPENDED){ 135 // queue a ResumeXCommand 136 if (action.getExternalId() != null) { 137 queue(new ResumeXCommand(action.getExternalId())); 138 updateCoordAction(action); 139 LOG.debug("Resume coord action = [{0}], new status = [{1}], pending = [{2}] and queue ResumeXCommand for [{3}]", 140 action.getId(), action.getStatus(), action.getPending(), action.getExternalId()); 141 }else { 142 updateCoordAction(action); 143 LOG.debug("Resume coord action = [{0}], new status = [{1}], pending = [{2}] and external id is null", 144 action.getId(), action.getStatus(), action.getPending()); 145 } 146 } 147 } 148 } 149 catch (XException ex) { 150 exceptionOccured = true; 151 throw new CommandException(ex); 152 } 153 finally { 154 if (exceptionOccured) { 155 coordJob.setStatus(CoordinatorJob.Status.FAILED); 156 coordJob.resetPending(); 157 LOG.warn("Resume children failed so fail coordinator, coordinator job id = " + jobId 158 + ", status = " + coordJob.getStatus()); 159 try { 160 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 161 } 162 catch (JPAExecutorException je) { 163 LOG.error("Failed to update coordinator job : " + jobId, je); 164 } 165 } 166 } 167 } 168 169 /* (non-Javadoc) 170 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 171 */ 172 @Override 173 public void notifyParent() throws CommandException { 174 // update bundle action 175 if (this.coordJob.getBundleId() != null) { 176 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 177 bundleStatusUpdate.call(); 178 } 179 } 180 181 private void updateCoordAction(CoordinatorActionBean action) throws CommandException { 182 action.setStatus(CoordinatorActionBean.Status.RUNNING); 183 action.incrementAndGetPending(); 184 action.setLastModifiedTime(new Date()); 185 try { 186 jpaService.execute(new CoordActionUpdateJPAExecutor(action)); 187 } 188 catch (JPAExecutorException e) { 189 throw new CommandException(e); 190 } 191 } 192 193 /* (non-Javadoc) 194 * @see org.apache.oozie.command.TransitionXCommand#getJob() 195 */ 196 @Override 197 public Job getJob() { 198 return coordJob; 199 } 200 }