001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.util.Date; 021 import java.util.List; 022 023 import org.apache.oozie.CoordinatorActionBean; 024 import org.apache.oozie.CoordinatorJobBean; 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.XException; 027 import org.apache.oozie.client.CoordinatorJob; 028 import org.apache.oozie.client.Job; 029 import org.apache.oozie.command.CommandException; 030 import org.apache.oozie.command.PreconditionException; 031 import org.apache.oozie.command.ResumeTransitionXCommand; 032 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 033 import org.apache.oozie.command.wf.ResumeXCommand; 034 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor; 035 import org.apache.oozie.executor.jpa.CoordJobGetActionsSuspendedJPAExecutor; 036 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 037 import org.apache.oozie.executor.jpa.JPAExecutorException; 038 import org.apache.oozie.service.JPAService; 039 import org.apache.oozie.service.Services; 040 import org.apache.oozie.util.InstrumentUtils; 041 import org.apache.oozie.util.LogUtils; 042 import org.apache.oozie.util.ParamChecker; 043 044 /** 045 * Resume coordinator job and actions. 046 * 047 */ 048 public class CoordResumeXCommand extends ResumeTransitionXCommand { 049 private final String jobId; 050 private CoordinatorJobBean coordJob = null; 051 private JPAService jpaService = null; 052 private boolean exceptionOccured = false; 053 CoordinatorJob.Status prevStatus; 054 055 public CoordResumeXCommand(String id) { 056 super("coord_resume", "coord_resume", 1); 057 this.jobId = ParamChecker.notEmpty(id, "id"); 058 } 059 060 /* (non-Javadoc) 061 * @see org.apache.oozie.command.XCommand#getEntityKey() 062 */ 063 @Override 064 public String getEntityKey() { 065 return jobId; 066 } 067 068 /* (non-Javadoc) 069 * @see org.apache.oozie.command.XCommand#isLockRequired() 070 */ 071 @Override 072 protected boolean isLockRequired() { 073 return true; 074 } 075 076 /* (non-Javadoc) 077 * @see org.apache.oozie.command.XCommand#loadState() 078 */ 079 @Override 080 protected void loadState() throws CommandException { 081 jpaService = Services.get().get(JPAService.class); 082 if (jpaService == null) { 083 throw new CommandException(ErrorCode.E0610); 084 } 085 try { 086 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 087 } 088 catch (JPAExecutorException e) { 089 throw new CommandException(e); 090 } 091 setJob(coordJob); 092 prevStatus = coordJob.getStatus(); 093 LogUtils.setLogInfo(coordJob, logInfo); 094 } 095 096 /* (non-Javadoc) 097 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 098 */ 099 @Override 100 protected void verifyPrecondition() throws CommandException, PreconditionException { 101 if (coordJob.getStatus() != CoordinatorJob.Status.SUSPENDED && coordJob.getStatus() != CoordinatorJob.Status.SUSPENDEDWITHERROR && coordJob.getStatus() != Job.Status.PREPSUSPENDED) { 102 throw new PreconditionException(ErrorCode.E1100, "CoordResumeXCommand not Resumed - " 103 + "job not in SUSPENDED/SUSPENDEDWITHERROR/PREPSUSPENDED state, job = " + jobId); 104 } 105 } 106 107 /* (non-Javadoc) 108 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 109 */ 110 @Override 111 public void updateJob() { 112 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 113 coordJob.setSuspendedTime(null); 114 coordJob.setLastModifiedTime(new Date()); 115 LOG.debug("Resume coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = " 116 + coordJob.isPending()); 117 updateList.add(coordJob); 118 } 119 120 /* (non-Javadoc) 121 * @see org.apache.oozie.command.ResumeTransitionXCommand#resumeChildren() 122 */ 123 @Override 124 public void resumeChildren() throws CommandException { 125 try { 126 // Get all suspended actions to resume them 127 List<CoordinatorActionBean> actionList = jpaService.execute(new CoordJobGetActionsSuspendedJPAExecutor( 128 jobId)); 129 130 for (CoordinatorActionBean action : actionList) { 131 if (action.getExternalId() != null) { 132 // queue a ResumeXCommand 133 queue(new ResumeXCommand(action.getExternalId())); 134 updateCoordAction(action); 135 LOG.debug( 136 "Resume coord action = [{0}], new status = [{1}], pending = [{2}] and queue ResumeXCommand for [{3}]", 137 action.getId(), action.getStatus(), action.getPending(), action.getExternalId()); 138 } 139 else { 140 updateCoordAction(action); 141 LOG.debug( 142 "Resume coord action = [{0}], new status = [{1}], pending = [{2}] and external id is null", 143 action.getId(), action.getStatus(), action.getPending()); 144 } 145 } 146 147 } 148 catch (XException ex) { 149 exceptionOccured = true; 150 throw new CommandException(ex); 151 } 152 finally { 153 if (exceptionOccured) { 154 coordJob.setStatus(CoordinatorJob.Status.FAILED); 155 coordJob.resetPending(); 156 LOG.warn("Resume children failed so fail coordinator, coordinator job id = " + jobId + ", status = " 157 + coordJob.getStatus()); 158 updateList.add(coordJob); 159 } 160 } 161 } 162 163 /* (non-Javadoc) 164 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 165 */ 166 @Override 167 public void notifyParent() throws CommandException { 168 // update bundle action 169 if (this.coordJob.getBundleId() != null) { 170 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 171 bundleStatusUpdate.call(); 172 } 173 } 174 175 /* (non-Javadoc) 176 * @see org.apache.oozie.command.ResumeTransitionXCommand#performWrites() 177 */ 178 @Override 179 public void performWrites() throws CommandException { 180 try { 181 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null)); 182 } 183 catch (JPAExecutorException e) { 184 throw new CommandException(e); 185 } 186 } 187 188 private void updateCoordAction(CoordinatorActionBean action) { 189 action.setStatus(CoordinatorActionBean.Status.RUNNING); 190 action.incrementAndGetPending(); 191 action.setLastModifiedTime(new Date()); 192 updateList.add(action); 193 } 194 195 /* (non-Javadoc) 196 * @see org.apache.oozie.command.TransitionXCommand#getJob() 197 */ 198 @Override 199 public Job getJob() { 200 return coordJob; 201 } 202 }