001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.sql.Timestamp; 021 import java.util.Date; 022 023 import org.apache.oozie.CoordinatorActionBean; 024 import org.apache.oozie.ErrorCode; 025 import org.apache.oozie.WorkflowJobBean; 026 import org.apache.oozie.XException; 027 import org.apache.oozie.service.JPAService; 028 import org.apache.oozie.service.Services; 029 import org.apache.oozie.util.InstrumentUtils; 030 import org.apache.oozie.util.LogUtils; 031 import org.apache.oozie.util.ParamChecker; 032 import org.apache.oozie.util.db.SLADbOperations; 033 import org.apache.oozie.client.CoordinatorAction; 034 import org.apache.oozie.client.WorkflowJob; 035 import org.apache.oozie.client.SLAEvent.SlaAppType; 036 import org.apache.oozie.client.SLAEvent.Status; 037 import org.apache.oozie.command.CommandException; 038 import org.apache.oozie.command.PreconditionException; 039 import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor; 040 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 041 042 /** 043 * The command checks workflow status for coordinator action. 044 */ 045 public class CoordActionCheckXCommand extends CoordinatorXCommand<Void> { 046 private String actionId; 047 private int actionCheckDelay; 048 private CoordinatorActionBean coordAction = null; 049 private JPAService jpaService = null; 050 051 public CoordActionCheckXCommand(String actionId, int actionCheckDelay) { 052 super("coord_action_check", "coord_action_check", 0); 053 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 054 this.actionCheckDelay = actionCheckDelay; 055 } 056 057 /* (non-Javadoc) 058 * @see org.apache.oozie.command.XCommand#execute() 059 */ 060 @Override 061 protected Void execute() throws CommandException { 062 try { 063 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 064 WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetJPAExecutor(coordAction.getExternalId())); 065 066 Status slaStatus = null; 067 068 if (wf.getStatus() == WorkflowJob.Status.SUCCEEDED) { 069 coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED); 070 // set pending to false as the status is SUCCEEDED 071 coordAction.setPending(0); 072 slaStatus = Status.SUCCEEDED; 073 } 074 else { 075 if (wf.getStatus() == WorkflowJob.Status.FAILED) { 076 coordAction.setStatus(CoordinatorAction.Status.FAILED); 077 slaStatus = Status.FAILED; 078 // set pending to false as the status is FAILED 079 coordAction.setPending(0); 080 } 081 else { 082 if (wf.getStatus() == WorkflowJob.Status.KILLED) { 083 coordAction.setStatus(CoordinatorAction.Status.KILLED); 084 slaStatus = Status.KILLED; 085 // set pending to false as the status is KILLED 086 coordAction.setPending(0); 087 } 088 else { 089 LOG.warn("Unexpected workflow " + wf.getId() + " STATUS " + wf.getStatus()); 090 coordAction.setLastModifiedTime(new Date()); 091 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor(coordAction)); 092 return null; 093 } 094 } 095 } 096 097 LOG.debug("Updating Coordintaor actionId :" + coordAction.getId() + "status to =" 098 + coordAction.getStatus()); 099 coordAction.setLastModifiedTime(new Date()); 100 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor(coordAction)); 101 102 if (slaStatus != null) { 103 SLADbOperations.writeStausEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus, 104 SlaAppType.COORDINATOR_ACTION, LOG); 105 } 106 } 107 catch (XException ex) { 108 LOG.warn("CoordActionCheckCommand Failed ", ex); 109 throw new CommandException(ex); 110 } 111 return null; 112 } 113 114 /* (non-Javadoc) 115 * @see org.apache.oozie.command.XCommand#getEntityKey() 116 */ 117 @Override 118 public String getEntityKey() { 119 return actionId; 120 } 121 122 /* (non-Javadoc) 123 * @see org.apache.oozie.command.XCommand#isLockRequired() 124 */ 125 @Override 126 protected boolean isLockRequired() { 127 return true; 128 } 129 130 /* (non-Javadoc) 131 * @see org.apache.oozie.command.XCommand#loadState() 132 */ 133 @Override 134 protected void loadState() throws CommandException { 135 try { 136 jpaService = Services.get().get(JPAService.class); 137 138 if (jpaService != null) { 139 coordAction = jpaService.execute(new CoordActionGetForCheckJPAExecutor(actionId)); 140 LogUtils.setLogInfo(coordAction, logInfo); 141 } 142 else { 143 throw new CommandException(ErrorCode.E0610); 144 } 145 } 146 catch (XException ex) { 147 throw new CommandException(ex); 148 } 149 } 150 151 /* (non-Javadoc) 152 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 153 */ 154 @Override 155 protected void verifyPrecondition() throws CommandException, PreconditionException { 156 // if the action has been updated, quit this command 157 Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000); 158 Timestamp cactionLmt = coordAction.getLastModifiedTimestamp(); 159 if (cactionLmt.after(actionCheckTs)) { 160 throw new PreconditionException(ErrorCode.E1100, "The coord action :" + actionId 161 + " has been udated. Ignore CoordActionCheckCommand!"); 162 } 163 if (coordAction.getStatus().equals(CoordinatorAction.Status.SUCCEEDED) 164 || coordAction.getStatus().equals(CoordinatorAction.Status.FAILED) 165 || coordAction.getStatus().equals(CoordinatorAction.Status.KILLED)) { 166 throw new PreconditionException(ErrorCode.E1100); 167 } 168 } 169 }