001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import org.apache.oozie.CoordinatorJobBean; 021 import org.apache.oozie.client.CoordinatorJob; 022 import org.apache.oozie.command.CommandException; 023 import org.apache.oozie.store.CoordinatorStore; 024 import org.apache.oozie.store.StoreException; 025 import org.apache.oozie.util.XLog; 026 027 public class CoordRecoveryCommand extends CoordinatorCommand<Void> { 028 private final XLog log = XLog.getLog(getClass()); 029 private String jobId; 030 031 public CoordRecoveryCommand(String id) { 032 super("coord_recovery", "coord_recovery", 1, XLog.STD); 033 this.jobId = id; 034 } 035 036 @Override 037 protected Void call(CoordinatorStore store) throws StoreException { 038 //CoordinatorJobBean coordJob = store.getCoordinatorJob(jobId, true); 039 CoordinatorJobBean coordJob = store.getEntityManager().find(CoordinatorJobBean.class, jobId); 040 setLogInfo(coordJob); 041 if (coordJob.getStatus() == CoordinatorJob.Status.PREMATER) { 042 // update status of job from PREMATER to RUNNING in coordJob 043 coordJob.setStatus(CoordinatorJob.Status.RUNNING); 044 store.updateCoordinatorJob(coordJob); 045 log.debug("[" + jobId + "]: Recover status from PREMATER to RUNNING"); 046 } 047 else { 048 log.debug("[" + jobId + "]: already in non-PREMATER status"); 049 } 050 return null; 051 } 052 053 @Override 054 protected Void execute(CoordinatorStore store) throws StoreException, CommandException { 055 log.info("STARTED CoordRecoveryCommand for jobId=" + jobId); 056 try { 057 if (lock(jobId)) { 058 call(store); 059 } 060 else { 061 queueCallable(new CoordRecoveryCommand(jobId), LOCK_FAILURE_REQUEUE_INTERVAL); 062 log.warn("CoordRecoveryCommand lock was not acquired - failed jobId=" + jobId 063 + ". Requeing the same."); 064 } 065 } 066 catch (InterruptedException e) { 067 queueCallable(new CoordRecoveryCommand(jobId), LOCK_FAILURE_REQUEUE_INTERVAL); 068 log.warn("CoordRecoveryCommand lock acquiring failed with exception " + e.getMessage() 069 + " for jobId=" + jobId + " Requeing the same."); 070 } 071 finally { 072 log.info("ENDED CoordRecoveryCommand for jobId=" + jobId); 073 } 074 return null; 075 } 076 077 }