001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import org.apache.oozie.client.CoordinatorJob; 021 import org.apache.oozie.CoordinatorActionBean; 022 import org.apache.oozie.CoordinatorJobBean; 023 import org.apache.oozie.XException; 024 import org.apache.oozie.command.CommandException; 025 import org.apache.oozie.store.CoordinatorStore; 026 import org.apache.oozie.store.StoreException; 027 import org.apache.oozie.util.ParamChecker; 028 import org.apache.oozie.util.XLog; 029 030 import org.apache.oozie.command.wf.ResumeCommand; 031 032 import java.util.Date; 033 import java.util.List; 034 035 public class CoordResumeCommand extends CoordinatorCommand<Void> { 036 037 private String jobId; 038 private final XLog log = XLog.getLog(getClass()); 039 040 public CoordResumeCommand(String id) { 041 super("coord_resume", "coord_resume", 1, XLog.STD); 042 this.jobId = ParamChecker.notEmpty(id, "id"); 043 } 044 045 protected Void call(CoordinatorStore store) throws StoreException, CommandException { 046 try { 047 // CoordinatorJobBean coordJob = store.getCoordinatorJob(jobId, 048 // false); 049 CoordinatorJobBean coordJob = store.getEntityManager().find(CoordinatorJobBean.class, jobId); 050 setLogInfo(coordJob); 051 if (coordJob.getStatus() == CoordinatorJob.Status.SUSPENDED) { 052 incrJobCounter(1); 053 coordJob.setStatus(CoordinatorJob.Status.PREP); 054 List<CoordinatorActionBean> actionList = store.getActionsForCoordinatorJob(jobId, false); 055 for (CoordinatorActionBean action : actionList) { 056 // queue a ResumeCommand 057 if (action.getExternalId() != null) { 058 queueCallable(new ResumeCommand(action.getExternalId())); 059 } 060 } 061 store.updateCoordinatorJob(coordJob); 062 } 063 // TODO queueCallable(new NotificationCommand(coordJob)); 064 else { 065 log.info("CoordResumeCommand not Resumed - " + "job not in SUSPENDED state " + jobId); 066 } 067 return null; 068 } 069 catch (XException ex) { 070 throw new CommandException(ex); 071 } 072 } 073 074 @Override 075 protected Void execute(CoordinatorStore store) throws StoreException, CommandException { 076 log.info("STARTED CoordResumeCommand for jobId=" + jobId); 077 try { 078 if (lock(jobId)) { 079 call(store); 080 } 081 else { 082 queueCallable(new CoordResumeCommand(jobId), LOCK_FAILURE_REQUEUE_INTERVAL); 083 log.warn("CoordResumeCommand lock was not acquired - " + " failed " + jobId + ". Requeing the same."); 084 } 085 } 086 catch (InterruptedException e) { 087 queueCallable(new CoordResumeCommand(jobId), LOCK_FAILURE_REQUEUE_INTERVAL); 088 log.warn("CoordResumeCommand lock acquiring failed " + " with exception " + e.getMessage() + " for job id " 089 + jobId + ". Requeing the same."); 090 } 091 finally { 092 log.info("ENDED CoordResumeCommand for jobId=" + jobId); 093 } 094 return null; 095 } 096 097 }