001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import org.apache.oozie.CoordinatorActionBean; 021 import org.apache.oozie.WorkflowJobBean; 022 import org.apache.oozie.XException; 023 import org.apache.oozie.store.CoordinatorStore; 024 import org.apache.oozie.store.StoreException; 025 import org.apache.oozie.util.XLog; 026 import org.apache.oozie.util.db.SLADbOperations; 027 import org.apache.oozie.client.CoordinatorAction; 028 import org.apache.oozie.client.WorkflowJob; 029 import org.apache.oozie.client.SLAEvent.SlaAppType; 030 import org.apache.oozie.client.SLAEvent.Status; 031 import org.apache.oozie.command.CommandException; 032 033 public class CoordActionUpdateCommand extends CoordinatorCommand<Void> { 034 private final XLog log = XLog.getLog(getClass()); 035 private WorkflowJobBean workflow; 036 private CoordinatorActionBean caction = null; 037 038 public CoordActionUpdateCommand(WorkflowJobBean workflow) { 039 super("coord-action-update", "coord-action-update", 1, XLog.OPS); 040 this.workflow = workflow; 041 } 042 043 @Override 044 protected Void call(CoordinatorStore cstore) throws StoreException, CommandException { 045 try { 046 if (workflow.getStatus() == WorkflowJob.Status.RUNNING 047 || workflow.getStatus() == WorkflowJob.Status.SUSPENDED) { 048 //update lastModifiedTime 049 cstore.updateCoordinatorAction(caction); 050 return null; 051 } 052 // CoordinatorActionBean caction = 053 // cstore.getCoordinatorActionForExternalId(workflow.getId()); 054 Status slaStatus = null; 055 if (caction != null) { 056 if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) { 057 caction.setStatus(CoordinatorAction.Status.SUCCEEDED); 058 slaStatus = Status.SUCCEEDED; 059 } 060 else { 061 if (workflow.getStatus() == WorkflowJob.Status.FAILED) { 062 caction.setStatus(CoordinatorAction.Status.FAILED); 063 slaStatus = Status.FAILED; 064 } 065 else { 066 if (workflow.getStatus() == WorkflowJob.Status.KILLED) { 067 caction.setStatus(CoordinatorAction.Status.KILLED); 068 slaStatus = Status.KILLED; 069 } 070 else { 071 log.warn( 072 "Unexpected workflow " + workflow.getId() + " STATUS " + workflow.getStatus()); 073 //update lastModifiedTime 074 cstore.updateCoordinatorAction(caction); 075 return null; 076 } 077 } 078 } 079 080 log.info( 081 "Updating Coordintaor id :" + caction.getId() + "status to =" + caction.getStatus()); 082 cstore.updateCoordinatorAction(caction); 083 if (slaStatus != null) { 084 SLADbOperations.writeStausEvent(caction.getSlaXml(), caction.getId(), cstore, slaStatus, 085 SlaAppType.COORDINATOR_ACTION); 086 } 087 queueCallable(new CoordActionReadyCommand(caction.getJobId())); 088 } 089 } 090 catch (XException ex) { 091 log.warn("CoordActionUpdate Failed ", ex.getMessage()); 092 throw new CommandException(ex); 093 } 094 return null; 095 } 096 097 @Override 098 protected Void execute(CoordinatorStore store) throws StoreException, CommandException { 099 log.info("STARTED CoordActionUpdateCommand for wfId=" + workflow.getId()); 100 caction = store.getCoordinatorActionForExternalId(workflow.getId()); 101 if (caction == null) { 102 log.info("ENDED CoordActionUpdateCommand for wfId=" + workflow.getId() + ", coord action is null"); 103 return null; 104 } 105 setLogInfo(caction); 106 String jobId = caction.getJobId(); 107 try { 108 if (lock(jobId)) { 109 call(store); 110 } 111 else { 112 queueCallable(new CoordActionUpdateCommand(workflow), LOCK_FAILURE_REQUEUE_INTERVAL); 113 log.warn("CoordActionUpdateCommand lock was not acquired - failed JobId=" + jobId + ", wfId=" 114 + workflow.getId() + ". Requeing the same."); 115 } 116 } 117 catch (InterruptedException e) { 118 queueCallable(new CoordActionUpdateCommand(workflow), LOCK_FAILURE_REQUEUE_INTERVAL); 119 log.warn("CoordActionUpdateCommand lock acquiring failed with exception " + e.getMessage() + " for jobId=" 120 + jobId + ", wfId=" + workflow.getId() + ". Requeing the same."); 121 } 122 finally { 123 log.info("ENDED CoordActionUpdateCommand for wfId=" + workflow.getId() + ", jobId=" + jobId); 124 } 125 return null; 126 } 127 }