001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.sql.Timestamp; 021 import java.util.Date; 022 023 import org.apache.oozie.WorkflowActionBean; 024 import org.apache.oozie.WorkflowJobBean; 025 import org.apache.oozie.action.ActionExecutor; 026 import org.apache.oozie.action.ActionExecutorException; 027 import org.apache.oozie.client.WorkflowJob; 028 import org.apache.oozie.client.WorkflowAction.Status; 029 import org.apache.oozie.command.CommandException; 030 import org.apache.oozie.service.ActionService; 031 import org.apache.oozie.service.Services; 032 import org.apache.oozie.service.UUIDService; 033 import org.apache.oozie.store.StoreException; 034 import org.apache.oozie.store.WorkflowStore; 035 import org.apache.oozie.util.Instrumentation; 036 import org.apache.oozie.util.XLog; 037 038 /** 039 * Executes the check command for ActionHandlers. </p> Ensures the action is in RUNNING state before executing {@link 040 * ActionExecutor#check(org.apache.oozie.action.ActionExecutor.Context, org.apache.oozie.client.WorkflowAction)} 041 */ 042 public class ActionCheckCommand extends ActionCommand<Void> { 043 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING"; 044 private String id; 045 private String jobId; 046 private int actionCheckDelay; 047 048 public ActionCheckCommand(String id) { 049 this(id, -1); 050 } 051 052 public ActionCheckCommand(String id, int priority, int checkDelay) { 053 super("action.check", "action.check", priority); 054 this.id = id; 055 this.actionCheckDelay = checkDelay; 056 } 057 058 public ActionCheckCommand(String id, int checkDelay) { 059 this(id, 0, checkDelay); 060 } 061 062 @Override 063 protected Void call(WorkflowStore store) throws StoreException, CommandException { 064 065 // String jobId = Services.get().get(UUIDService.class).getId(id); 066 WorkflowJobBean workflow = store.getWorkflow(jobId, false); 067 setLogInfo(workflow); 068 WorkflowActionBean action = store.getAction(id, false); 069 setLogInfo(action); 070 if (action.isPending() && action.getStatus() == WorkflowActionBean.Status.RUNNING) { 071 setLogInfo(workflow); 072 // if the action has been updated, quit this command 073 if (actionCheckDelay > 0) { 074 Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000); 075 Timestamp actionLmt = action.getLastCheckTimestamp(); 076 if (actionLmt.after(actionCheckTs)) { 077 XLog.getLog(getClass()).debug( 078 "The wf action :" + id + " has been udated recently. Ignoring ActionCheckCommand!"); 079 return null; 080 } 081 } 082 if (workflow.getStatus() == WorkflowJob.Status.RUNNING) { 083 ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(action.getType()); 084 if (executor != null) { 085 ActionExecutorContext context = null; 086 try { 087 boolean isRetry = false; 088 context = new ActionCommand.ActionExecutorContext(workflow, action, isRetry); 089 incrActionCounter(action.getType(), 1); 090 091 Instrumentation.Cron cron = new Instrumentation.Cron(); 092 cron.start(); 093 executor.check(context, action); 094 cron.stop(); 095 addActionCron(action.getType(), cron); 096 097 if (action.isExecutionComplete()) { 098 if (!context.isExecuted()) { 099 XLog.getLog(getClass()).warn(XLog.OPS, 100 "Action Completed, ActionExecutor [{0}] must call setExecutionData()", 101 executor.getType()); 102 action.setErrorInfo(EXEC_DATA_MISSING, 103 "Execution Complete, but Execution Data Missing from Action"); 104 failJob(context); 105 action.setLastCheckTime(new Date()); 106 store.updateAction(action); 107 store.updateWorkflow(workflow); 108 return null; 109 } 110 action.setPending(); 111 queueCallable(new ActionEndCommand(action.getId(), action.getType())); 112 } 113 action.setLastCheckTime(new Date()); 114 store.updateAction(action); 115 store.updateWorkflow(workflow); 116 } 117 catch (ActionExecutorException ex) { 118 XLog.getLog(getClass()).warn( 119 "Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(), 120 ex.getMessage(), ex); 121 122 switch (ex.getErrorType()) { 123 case FAILED: 124 failAction(workflow, action); 125 break; 126 } 127 action.setLastCheckTime(new Date()); 128 store.updateAction(action); 129 store.updateWorkflow(workflow); 130 return null; 131 } 132 } 133 } 134 else { 135 action.setLastCheckTime(new Date()); 136 store.updateAction(action); 137 XLog.getLog(getClass()).warn( 138 "Action [{0}] status is running but WF Job [{1}] status is [{2}]. Expected status is RUNNING.", 139 action.getId(), workflow.getId(), workflow.getStatus()); 140 } 141 } 142 return null; 143 } 144 145 private void failAction(WorkflowJobBean workflow, WorkflowActionBean action) throws CommandException { 146 XLog.getLog(getClass()).warn("Failing Job [{0}] due to failed action [{1}]", workflow.getId(), action.getId()); 147 action.resetPending(); 148 action.setStatus(Status.FAILED); 149 workflow.setStatus(WorkflowJob.Status.FAILED); 150 incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1); 151 } 152 153 /** 154 * @param args 155 * @throws Exception 156 */ 157 public static void main(String[] args) throws Exception { 158 new Services().init(); 159 160 try { 161 new ActionCheckCommand("0000001-100122154231282-oozie-dani-W@pig1").call(); 162 Thread.sleep(100000); 163 } 164 finally { 165 new Services().destroy(); 166 } 167 } 168 169 @Override 170 protected Void execute(WorkflowStore store) throws CommandException, StoreException { 171 try { 172 XLog.getLog(getClass()).debug("STARTED ActionCheckCommand for wf actionId=" + id + " priority =" + getPriority()); 173 jobId = Services.get().get(UUIDService.class).getId(id); 174 if (lock(jobId)) { 175 call(store); 176 } 177 else { 178 queueCallable(new ActionCheckCommand(id, actionCheckDelay), LOCK_FAILURE_REQUEUE_INTERVAL); 179 XLog.getLog(getClass()).warn("ActionCheckCommand lock was not acquired - failed {0}", id); 180 } 181 } 182 catch (InterruptedException e) { 183 queueCallable(new ActionCheckCommand(id, actionCheckDelay), LOCK_FAILURE_REQUEUE_INTERVAL); 184 XLog.getLog(getClass()).warn("ActionCheckCommand lock was not acquired - interrupted exception failed {0}", 185 id); 186 } 187 XLog.getLog(getClass()).debug("ENDED ActionCheckCommand for wf actionId=" + id + ", jobId=" + jobId); 188 return null; 189 } 190 }