001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.util.Date;
021    
022    import org.apache.oozie.CoordinatorActionBean;
023    import org.apache.oozie.CoordinatorJobBean;
024    import org.apache.oozie.client.CoordinatorAction;
025    import org.apache.oozie.client.CoordinatorJob;
026    import org.apache.oozie.command.CommandException;
027    import org.apache.oozie.store.CoordinatorStore;
028    import org.apache.oozie.store.StoreException;
029    import org.apache.oozie.util.XLog;
030    
031    public class CoordActionTimeOut extends CoordinatorCommand<Void> {
032        private CoordinatorActionBean actionBean;
033        private final XLog log = XLog.getLog(getClass());
034    
035        public CoordActionTimeOut(CoordinatorActionBean actionBean) {
036            super("coord_action_timeout", "coord_action_timeout", 1, XLog.STD);
037            this.actionBean = actionBean;
038        }
039    
040        @Override
041        protected Void call(CoordinatorStore store) throws StoreException, CommandException {
042            // actionBean = store.getCoordinatorAction(actionBean.getId(), false);
043            actionBean = store.getEntityManager().find(CoordinatorActionBean.class, actionBean.getId());
044            if (actionBean.getStatus() == CoordinatorAction.Status.WAITING) {
045                actionBean.setStatus(CoordinatorAction.Status.TIMEDOUT);
046                queueCallable(new CoordActionNotification(actionBean), 100);
047                store.updateCoordinatorAction(actionBean);
048            }
049            return null;
050        }
051    
052        @Override
053        protected Void execute(CoordinatorStore store) throws StoreException, CommandException {
054            String jobId = actionBean.getJobId();
055            setLogInfo(actionBean);
056            log.info("STARTED CoordinatorActionTimeOut for Action Id " + actionBean.getId() + " of job Id :"
057                    + actionBean.getJobId() + ". Timeout value is " + actionBean.getTimeOut() + " mins");
058            try {
059                if (lock(jobId)) {
060                    call(store);
061                }
062                else {
063                    queueCallable(new CoordActionTimeOut(actionBean), LOCK_FAILURE_REQUEUE_INTERVAL);
064                    log.warn("CoordinatorActionTimeOut lock was not acquired - " + " failed " + jobId
065                            + ". Requeing the same.");
066                }
067            }
068            catch (InterruptedException e) {
069                queueCallable(new CoordActionTimeOut(actionBean), LOCK_FAILURE_REQUEUE_INTERVAL);
070                log.warn("CoordinatorActionTimeOut lock acquiring failed " + " with exception " + e.getMessage()
071                        + " for job id " + jobId + ". Requeing the same.");
072            }
073            finally {
074                log.info("ENDED CoordinatorActionTimeOut for Action Id " + actionBean.getId());
075            }
076            return null;
077        }
078    }