001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command; 019 020import java.util.ArrayList; 021import java.util.Date; 022import java.util.List; 023 024import org.apache.oozie.CoordinatorActionBean; 025import org.apache.oozie.CoordinatorJobBean; 026import org.apache.oozie.client.Job; 027import org.apache.oozie.client.rest.JsonBean; 028import org.apache.oozie.command.coord.CoordinatorXCommand; 029import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 030import org.apache.oozie.util.ParamChecker; 031 032/** 033 * This is the base commands for all the jobs related commands . This will drive the statuses for all the jobs and all 034 * the jobs will follow the same state machine. 035 * 036 * @param <T> 037 */ 038public abstract class TransitionXCommand<T> extends XCommand<T> { 039 040 protected Job job; 041 protected List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 042 protected List<JsonBean> insertList = new ArrayList<JsonBean>(); 043 044 public TransitionXCommand(String name, String type, int priority) { 045 super(name, type, priority); 046 } 047 048 public TransitionXCommand(String name, String type, int priority, boolean dryrun) { 049 super(name, type, priority, dryrun); 050 } 051 052 /** 053 * Transit to the next status based on the result of the Job. 054 * 055 * @throws CommandException 056 */ 057 public abstract void transitToNext() throws CommandException; 058 059 /** 060 * Update the parent job. 061 * 062 * @throws CommandException 063 */ 064 public abstract void updateJob() throws CommandException; 065 066 /** 067 * This will be used to notify the parent about the status of that perticular job. 068 * 069 * @throws CommandException 070 */ 071 public abstract void notifyParent() throws CommandException; 072 073 /** 074 * This will be used to generate Job Notification events on status changes 075 * 076 * @param user 077 * @param appName 078 * @param em 079 * @throws CommandException 080 */ 081 public void generateEvents(CoordinatorJobBean coordJob, Date startTime) throws CommandException { 082 for(UpdateEntry entry : updateList){ 083 JsonBean actionBean = entry.getBean(); 084 if (actionBean instanceof CoordinatorActionBean) { 085 CoordinatorActionBean caBean = (CoordinatorActionBean) actionBean; 086 caBean.setJobId(coordJob.getId()); 087 CoordinatorXCommand.generateEvent(caBean, coordJob.getUser(), coordJob.getAppName(), startTime); 088 } 089 // TODO generate Coord Job event 090 } 091 } 092 093 /** 094 * This will be used to perform atomically all the writes within this command. 095 * 096 * @throws CommandException 097 */ 098 public abstract void performWrites() throws CommandException; 099 100 /* (non-Javadoc) 101 * @see org.apache.oozie.command.XCommand#execute() 102 */ 103 @Override 104 protected T execute() throws CommandException { 105 transitToNext(); 106 updateJob(); 107 notifyParent(); 108 return null; 109 } 110 111 /** 112 * Get the Job for the command. 113 * 114 * @return the job 115 */ 116 public Job getJob() { 117 return job; 118 } 119 120 /** 121 * Set the Job for the command. 122 * 123 * @param job the job 124 */ 125 public void setJob(Job job) { 126 this.job = ParamChecker.notNull(job, "job"); 127 } 128 129}