001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.util.Date; 021 022 import org.apache.oozie.CoordinatorActionBean; 023 import org.apache.oozie.CoordinatorJobBean; 024 import org.apache.oozie.client.CoordinatorAction; 025 import org.apache.oozie.client.CoordinatorJob; 026 import org.apache.oozie.command.CommandException; 027 import org.apache.oozie.store.CoordinatorStore; 028 import org.apache.oozie.store.StoreException; 029 import org.apache.oozie.util.XLog; 030 031 public class CoordActionTimeOut extends CoordinatorCommand<Void> { 032 private CoordinatorActionBean actionBean; 033 private final XLog log = XLog.getLog(getClass()); 034 035 public CoordActionTimeOut(CoordinatorActionBean actionBean) { 036 super("coord_action_timeout", "coord_action_timeout", 1, XLog.STD); 037 this.actionBean = actionBean; 038 } 039 040 @Override 041 protected Void call(CoordinatorStore store) throws StoreException, CommandException { 042 // actionBean = store.getCoordinatorAction(actionBean.getId(), false); 043 actionBean = store.getEntityManager().find(CoordinatorActionBean.class, actionBean.getId()); 044 if (actionBean.getStatus() == CoordinatorAction.Status.WAITING) { 045 actionBean.setStatus(CoordinatorAction.Status.TIMEDOUT); 046 queueCallable(new CoordActionNotification(actionBean), 100); 047 store.updateCoordinatorAction(actionBean); 048 } 049 return null; 050 } 051 052 @Override 053 protected Void execute(CoordinatorStore store) throws StoreException, CommandException { 054 String jobId = actionBean.getJobId(); 055 setLogInfo(actionBean); 056 log.info("STARTED CoordinatorActionTimeOut for Action Id " + actionBean.getId() + " of job Id :" 057 + actionBean.getJobId() + ". Timeout value is " + actionBean.getTimeOut() + " mins"); 058 try { 059 if (lock(jobId)) { 060 call(store); 061 } 062 else { 063 queueCallable(new CoordActionTimeOut(actionBean), LOCK_FAILURE_REQUEUE_INTERVAL); 064 log.warn("CoordinatorActionTimeOut lock was not acquired - " + " failed " + jobId 065 + ". Requeing the same."); 066 } 067 } 068 catch (InterruptedException e) { 069 queueCallable(new CoordActionTimeOut(actionBean), LOCK_FAILURE_REQUEUE_INTERVAL); 070 log.warn("CoordinatorActionTimeOut lock acquiring failed " + " with exception " + e.getMessage() 071 + " for job id " + jobId + ". Requeing the same."); 072 } 073 finally { 074 log.info("ENDED CoordinatorActionTimeOut for Action Id " + actionBean.getId()); 075 } 076 return null; 077 } 078 }