001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command; 020 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.List; 024 025import org.apache.oozie.CoordinatorActionBean; 026import org.apache.oozie.CoordinatorJobBean; 027import org.apache.oozie.client.Job; 028import org.apache.oozie.client.rest.JsonBean; 029import org.apache.oozie.command.coord.CoordinatorXCommand; 030import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 031import org.apache.oozie.util.ParamChecker; 032 033/** 034 * This is the base commands for all the jobs related commands . This will drive the statuses for all the jobs and all 035 * the jobs will follow the same state machine. 036 * 037 * @param <T> 038 */ 039public abstract class TransitionXCommand<T> extends XCommand<T> { 040 041 protected Job job; 042 protected List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 043 protected List<JsonBean> insertList = new ArrayList<JsonBean>(); 044 045 public TransitionXCommand(String name, String type, int priority) { 046 super(name, type, priority); 047 } 048 049 public TransitionXCommand(String name, String type, int priority, boolean dryrun) { 050 super(name, type, priority, dryrun); 051 } 052 053 /** 054 * Transit to the next status based on the result of the Job. 055 * 056 * @throws CommandException 057 */ 058 public abstract void transitToNext() throws CommandException; 059 060 /** 061 * Update the parent job. 062 * 063 * @throws CommandException 064 */ 065 public abstract void updateJob() throws CommandException; 066 067 /** 068 * This will be used to notify the parent about the status of that perticular job. 069 * 070 * @throws CommandException 071 */ 072 public abstract void notifyParent() throws CommandException; 073 074 /** 075 * This will be used to generate Job Notification events on status changes 076 * 077 * @param coordJob 078 * @param startTime 079 * @throws CommandException 080 */ 081 public void generateEvents(CoordinatorJobBean coordJob, Date startTime) throws CommandException { 082 for(UpdateEntry entry : updateList){ 083 JsonBean actionBean = entry.getBean(); 084 if (actionBean instanceof CoordinatorActionBean) { 085 CoordinatorActionBean caBean = (CoordinatorActionBean) actionBean; 086 caBean.setJobId(coordJob.getId()); 087 CoordinatorXCommand.generateEvent(caBean, coordJob.getUser(), coordJob.getAppName(), startTime); 088 } 089 // TODO generate Coord Job event 090 } 091 } 092 093 /** 094 * This will be used to perform atomically all the writes within this command. 095 * 096 * @throws CommandException 097 */ 098 public abstract void performWrites() throws CommandException; 099 100 /* (non-Javadoc) 101 * @see org.apache.oozie.command.XCommand#execute() 102 */ 103 @Override 104 protected T execute() throws CommandException { 105 transitToNext(); 106 updateJob(); 107 notifyParent(); 108 return null; 109 } 110 111 /** 112 * Get the Job for the command. 113 * 114 * @return the job 115 */ 116 public Job getJob() { 117 return job; 118 } 119 120 /** 121 * Set the Job for the command. 122 * 123 * @param job the job 124 */ 125 public void setJob(Job job) { 126 this.job = ParamChecker.notNull(job, "job"); 127 } 128 129}