001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.util.Date; 021 import java.util.List; 022 023 import org.apache.oozie.CoordinatorActionBean; 024 import org.apache.oozie.CoordinatorJobBean; 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.XException; 027 import org.apache.oozie.client.CoordinatorJob; 028 import org.apache.oozie.client.Job; 029 import org.apache.oozie.command.CommandException; 030 import org.apache.oozie.command.PreconditionException; 031 import org.apache.oozie.command.ResumeTransitionXCommand; 032 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 033 import org.apache.oozie.command.wf.ResumeXCommand; 034 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor; 035 import org.apache.oozie.executor.jpa.CoordJobGetActionsSuspendedJPAExecutor; 036 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 037 import org.apache.oozie.executor.jpa.JPAExecutorException; 038 import org.apache.oozie.service.JPAService; 039 import org.apache.oozie.service.Services; 040 import org.apache.oozie.util.InstrumentUtils; 041 import org.apache.oozie.util.LogUtils; 042 import org.apache.oozie.util.ParamChecker; 043 044 /** 045 * Resume coordinator job and actions. 046 * 047 */ 048 public class CoordResumeXCommand extends ResumeTransitionXCommand { 049 private final String jobId; 050 private CoordinatorJobBean coordJob = null; 051 private JPAService jpaService = null; 052 private boolean exceptionOccured = false; 053 CoordinatorJob.Status prevStatus; 054 055 public CoordResumeXCommand(String id) { 056 super("coord_resume", "coord_resume", 1); 057 this.jobId = ParamChecker.notEmpty(id, "id"); 058 } 059 060 @Override 061 public String getEntityKey() { 062 return jobId; 063 } 064 065 @Override 066 public String getKey() { 067 return getName() + "_" + this.jobId; 068 } 069 070 @Override 071 protected boolean isLockRequired() { 072 return true; 073 } 074 075 @Override 076 protected void loadState() throws CommandException { 077 jpaService = Services.get().get(JPAService.class); 078 if (jpaService == null) { 079 throw new CommandException(ErrorCode.E0610); 080 } 081 try { 082 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 083 } 084 catch (JPAExecutorException e) { 085 throw new CommandException(e); 086 } 087 setJob(coordJob); 088 prevStatus = coordJob.getStatus(); 089 LogUtils.setLogInfo(coordJob, logInfo); 090 } 091 092 @Override 093 protected void verifyPrecondition() throws CommandException, PreconditionException { 094 if (coordJob.getStatus() != CoordinatorJob.Status.SUSPENDED && coordJob.getStatus() != CoordinatorJob.Status.SUSPENDEDWITHERROR && coordJob.getStatus() != Job.Status.PREPSUSPENDED) { 095 throw new PreconditionException(ErrorCode.E1100, "CoordResumeXCommand not Resumed - " 096 + "job not in SUSPENDED/SUSPENDEDWITHERROR/PREPSUSPENDED state, job = " + jobId); 097 } 098 } 099 100 @Override 101 public void updateJob() { 102 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 103 coordJob.setSuspendedTime(null); 104 coordJob.setLastModifiedTime(new Date()); 105 LOG.debug("Resume coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = " 106 + coordJob.isPending()); 107 updateList.add(coordJob); 108 } 109 110 @Override 111 public void resumeChildren() throws CommandException { 112 try { 113 // Get all suspended actions to resume them 114 List<CoordinatorActionBean> actionList = jpaService.execute(new CoordJobGetActionsSuspendedJPAExecutor( 115 jobId)); 116 117 for (CoordinatorActionBean action : actionList) { 118 if (action.getExternalId() != null) { 119 // queue a ResumeXCommand 120 queue(new ResumeXCommand(action.getExternalId())); 121 updateCoordAction(action); 122 LOG.debug( 123 "Resume coord action = [{0}], new status = [{1}], pending = [{2}] and queue ResumeXCommand for [{3}]", 124 action.getId(), action.getStatus(), action.getPending(), action.getExternalId()); 125 } 126 else { 127 updateCoordAction(action); 128 LOG.debug( 129 "Resume coord action = [{0}], new status = [{1}], pending = [{2}] and external id is null", 130 action.getId(), action.getStatus(), action.getPending()); 131 } 132 } 133 134 } 135 catch (XException ex) { 136 exceptionOccured = true; 137 throw new CommandException(ex); 138 } 139 finally { 140 if (exceptionOccured) { 141 coordJob.setStatus(CoordinatorJob.Status.FAILED); 142 coordJob.resetPending(); 143 LOG.warn("Resume children failed so fail coordinator, coordinator job id = " + jobId + ", status = " 144 + coordJob.getStatus()); 145 updateList.add(coordJob); 146 } 147 } 148 } 149 150 @Override 151 public void notifyParent() throws CommandException { 152 // update bundle action 153 if (this.coordJob.getBundleId() != null) { 154 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 155 bundleStatusUpdate.call(); 156 } 157 } 158 159 @Override 160 public void performWrites() throws CommandException { 161 try { 162 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null)); 163 } 164 catch (JPAExecutorException e) { 165 throw new CommandException(e); 166 } 167 } 168 169 private void updateCoordAction(CoordinatorActionBean action) { 170 action.setStatus(CoordinatorActionBean.Status.RUNNING); 171 action.incrementAndGetPending(); 172 action.setLastModifiedTime(new Date()); 173 updateList.add(action); 174 } 175 176 @Override 177 public Job getJob() { 178 return coordJob; 179 } 180 }