001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie; 019 020 import java.io.IOException; 021 import java.io.Writer; 022 023 import org.apache.hadoop.conf.Configuration; 024 import org.apache.oozie.client.CoordinatorJob; 025 import org.apache.oozie.client.WorkflowJob; 026 import org.apache.oozie.executor.jpa.JPAExecutorException; 027 import org.apache.oozie.service.JMSTopicService; 028 import org.apache.oozie.service.Services; 029 030 public abstract class BaseEngine { 031 public static final String USE_XCOMMAND = "oozie.useXCommand"; 032 033 protected String user; 034 035 /** 036 * Return the user name. 037 * 038 * @return the user name. 039 */ 040 public String getUser() { 041 return user; 042 } 043 044 /** 045 * Submit a job. 046 * <p/> 047 * It validates configuration properties. 048 * 049 * @param conf job configuration. 050 * @param startJob indicates if the job should be started or not. 051 * @return the job Id. 052 * @throws BaseEngineException thrown if the job could not be created. 053 */ 054 public abstract String submitJob(Configuration conf, boolean startJob) throws BaseEngineException; 055 056 /** 057 * Start a job. 058 * 059 * @param jobId job Id. 060 * @throws BaseEngineException thrown if the job could not be started. 061 */ 062 public abstract void start(String jobId) throws BaseEngineException; 063 064 /** 065 * Resume a job. 066 * 067 * @param jobId job Id. 068 * @throws BaseEngineException thrown if the job could not be resumed. 069 */ 070 public abstract void resume(String jobId) throws BaseEngineException; 071 072 /** 073 * Suspend a job. 074 * 075 * @param jobId job Id. 076 * @throws BaseEngineException thrown if the job could not be suspended. 077 */ 078 public abstract void suspend(String jobId) throws BaseEngineException; 079 080 /** 081 * Kill a job. 082 * 083 * @param jobId job Id. 084 * @throws BaseEngineException thrown if the job could not be killed. 085 */ 086 public abstract void kill(String jobId) throws BaseEngineException; 087 088 /** 089 * Change a coordinator job. 090 * 091 * @param jobId job Id. 092 * @param changeValue change value. 093 * @throws BaseEngineException thrown if the job could not be changed. 094 */ 095 public abstract void change(String jobId, String changeValue) throws BaseEngineException; 096 097 /** 098 * Rerun a job. 099 * 100 * @param jobId job Id to rerun. 101 * @param conf configuration information for the rerun. 102 * @throws BaseEngineException thrown if the job could not be rerun. 103 */ 104 public abstract void reRun(String jobId, Configuration conf) throws BaseEngineException; 105 106 /** 107 * Return the info about a wf job. 108 * 109 * @param jobId job Id. 110 * @return the workflow job info. 111 * @throws DagEngineException thrown if the job info could not be obtained. 112 */ 113 public abstract WorkflowJob getJob(String jobId) throws BaseEngineException; 114 115 /** 116 * Return the info about a wf job with actions subset. 117 * 118 * @param jobId job Id 119 * @param start starting from this index in the list of actions belonging to the job 120 * @param length number of actions to be returned 121 * @return the workflow job info. 122 * @throws DagEngineException thrown if the job info could not be obtained. 123 */ 124 public abstract WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException; 125 126 /** 127 * Return the info about a coord job. 128 * 129 * @param jobId job Id. 130 * @return the coord job info. 131 * @throws BaseEngineException thrown if the job info could not be obtained. 132 */ 133 public abstract CoordinatorJob getCoordJob(String jobId) throws BaseEngineException; 134 135 /** 136 * Return the info about a coord job with actions subset. 137 * 138 * @param jobId job Id. 139 * @param filter the status filter 140 * @param start starting from this index in the list of actions belonging to the job 141 * @param length number of actions to be returned 142 * @param order true if actions are sorted in a descending order of nominal time, false if asc order 143 * @return the coord job info. 144 * @throws BaseEngineException thrown if the job info could not be obtained. 145 */ 146 public abstract CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc) 147 throws BaseEngineException; 148 149 /** 150 * Return the a job definition. 151 * 152 * @param jobId job Id. 153 * @return the job definition. 154 * @throws BaseEngineException thrown if the job definition could no be obtained. 155 */ 156 public abstract String getDefinition(String jobId) throws BaseEngineException; 157 158 /** 159 * Stream the log of a job. 160 * 161 * @param jobId job Id. 162 * @param writer writer to stream the log to. 163 * @throws IOException thrown if the log cannot be streamed. 164 * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for 165 * jobId. 166 */ 167 public abstract void streamLog(String jobId, Writer writer) throws IOException, BaseEngineException; 168 169 /** 170 * Return the workflow Job ID for an external ID. 171 * <p/> 172 * This is reverse lookup for recovery purposes. 173 * 174 * @param externalId external ID provided at job submission time. 175 * @return the associated workflow job ID if any, <code>null</code> if none. 176 * @throws BaseEngineException thrown if the lookup could not be done. 177 */ 178 public abstract String getJobIdForExternalId(String externalId) throws BaseEngineException; 179 180 /** 181 * Dry run a job; like {@link BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean) but doesn't actually execute 182 * the job. 183 * <p/> 184 * It validates configuration properties. 185 * 186 * @param conf job configuration. 187 * @return the result of the dryrun 188 * @throws BaseEngineException thrown if there was a problem doing the dryrun 189 */ 190 public abstract String dryRunSubmit(Configuration conf) throws BaseEngineException; 191 192 193 /** 194 * Return the jms topic name for the job. 195 * 196 * @param jobId job Id. 197 * @return String the topic name 198 * @throws DagEngineException thrown if the jms info could not be obtained. 199 */ 200 public String getJMSTopicName(String jobId) throws DagEngineException { 201 JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class); 202 if (jmsTopicService != null) { 203 try { 204 return jmsTopicService.getTopic(jobId); 205 } 206 catch (JPAExecutorException e) { 207 throw new DagEngineException(ErrorCode.E1602, e); 208 } 209 } 210 else { 211 throw new DagEngineException(ErrorCode.E1602, 212 "JMSTopicService is not initialized. JMS notification" 213 + "may not be enabled"); 214 } 215 } 216 217 }