001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie; 020 021import java.io.IOException; 022import java.io.Writer; 023import java.util.Date; 024import java.util.Map; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.oozie.client.CoordinatorJob; 028import org.apache.oozie.client.WorkflowJob; 029import org.apache.oozie.executor.jpa.JPAExecutorException; 030import org.apache.oozie.service.JMSTopicService; 031import org.apache.oozie.service.Services; 032import org.apache.oozie.service.XLogStreamingService; 033import org.apache.oozie.util.XLogFilter; 034 035public abstract class BaseEngine { 036 public static final String USE_XCOMMAND = "oozie.useXCommand"; 037 038 public enum LOG_TYPE { 039 LOG, ERROR_LOG, AUDIT_LOG 040 } 041 042 protected String user; 043 044 /** 045 * Return the user name. 046 * 047 * @return the user name. 048 */ 049 public String getUser() { 050 return user; 051 } 052 053 /** 054 * Submit a job. 055 * <p/> 056 * It validates configuration properties. 057 * 058 * @param conf job configuration. 059 * @param startJob indicates if the job should be started or not. 060 * @return the job Id. 061 * @throws BaseEngineException thrown if the job could not be created. 062 */ 063 public abstract String submitJob(Configuration conf, boolean startJob) throws BaseEngineException; 064 065 /** 066 * Start a job. 067 * 068 * @param jobId job Id. 069 * @throws BaseEngineException thrown if the job could not be started. 070 */ 071 public abstract void start(String jobId) throws BaseEngineException; 072 073 /** 074 * Resume a job. 075 * 076 * @param jobId job Id. 077 * @throws BaseEngineException thrown if the job could not be resumed. 078 */ 079 public abstract void resume(String jobId) throws BaseEngineException; 080 081 /** 082 * Suspend a job. 083 * 084 * @param jobId job Id. 085 * @throws BaseEngineException thrown if the job could not be suspended. 086 */ 087 public abstract void suspend(String jobId) throws BaseEngineException; 088 089 /** 090 * Kill a job. 091 * 092 * @param jobId job Id. 093 * @throws BaseEngineException thrown if the job could not be killed. 094 */ 095 public abstract void kill(String jobId) throws BaseEngineException; 096 097 /** 098 * Change a coordinator job. 099 * 100 * @param jobId job Id. 101 * @param changeValue change value. 102 * @throws BaseEngineException thrown if the job could not be changed. 103 */ 104 public abstract void change(String jobId, String changeValue) throws BaseEngineException; 105 106 /** 107 * Rerun a job. 108 * 109 * @param jobId job Id to rerun. 110 * @param conf configuration information for the rerun. 111 * @throws BaseEngineException thrown if the job could not be rerun. 112 */ 113 public abstract void reRun(String jobId, Configuration conf) throws BaseEngineException; 114 115 /** 116 * Return the info about a wf job. 117 * 118 * @param jobId job Id. 119 * @return the workflow job info. 120 * @throws DagEngineException thrown if the job info could not be obtained. 121 */ 122 public abstract WorkflowJob getJob(String jobId) throws BaseEngineException; 123 124 /** 125 * Return the info about a wf job with actions subset. 126 * 127 * @param jobId job Id 128 * @param start starting from this index in the list of actions belonging to the job 129 * @param length number of actions to be returned 130 * @return the workflow job info. 131 * @throws DagEngineException thrown if the job info could not be obtained. 132 */ 133 public abstract WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException; 134 135 /** 136 * Return the info about a coord job. 137 * 138 * @param jobId job Id. 139 * @return the coord job info. 140 * @throws BaseEngineException thrown if the job info could not be obtained. 141 */ 142 public abstract CoordinatorJob getCoordJob(String jobId) throws BaseEngineException; 143 144 /** 145 * Return the info about a coord job with actions subset. 146 * 147 * @param jobId job Id. 148 * @param filter the status filter 149 * @param start starting from this index in the list of actions belonging to the job 150 * @param length number of actions to be returned 151 * @param order true if actions are sorted in a descending order of nominal time, false if asc order 152 * @return the coord job info. 153 * @throws BaseEngineException thrown if the job info could not be obtained. 154 */ 155 public abstract CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc) 156 throws BaseEngineException; 157 158 /** 159 * Return the a job definition. 160 * 161 * @param jobId job Id. 162 * @return the job definition. 163 * @throws BaseEngineException thrown if the job definition could no be obtained. 164 */ 165 public abstract String getDefinition(String jobId) throws BaseEngineException; 166 167 /** 168 * Stream the log of a job. 169 * 170 * @param jobId job Id. 171 * @param writer writer to stream the log to. 172 * @param params additional parameters from the request 173 * @throws IOException thrown if the log cannot be streamed. 174 * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for 175 * jobId. 176 */ 177 public abstract void streamLog(String jobId, Writer writer, Map<String, String[]> params) 178 throws IOException, BaseEngineException; 179 180 /** 181 * Stream error log of a job. 182 * 183 * @param jobId job Id. 184 * @param writer writer to stream the log to. 185 * @param params additional parameters from the request 186 * @throws IOException thrown if the log cannot be streamed. 187 * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for 188 * jobId. 189 */ 190 public abstract void streamErrorLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, 191 BaseEngineException; 192 /** 193 * Stream Audit log of a job. 194 * 195 * @param jobId job Id. 196 * @param writer writer to stream the log to. 197 * @param params additional parameters from the request 198 * @throws IOException thrown if the log cannot be streamed. 199 * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for 200 * jobId. 201 */ 202 public abstract void streamAuditLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, 203 BaseEngineException; 204 205 206 /** 207 * Return the workflow Job ID for an external ID. 208 * <p/> 209 * This is reverse lookup for recovery purposes. 210 * 211 * @param externalId external ID provided at job submission time. 212 * @return the associated workflow job ID if any, <code>null</code> if none. 213 * @throws BaseEngineException thrown if the lookup could not be done. 214 */ 215 public abstract String getJobIdForExternalId(String externalId) throws BaseEngineException; 216 217 /** 218 * Dry run a job; like {@link BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean) but doesn't actually execute 219 * the job. 220 * <p/> 221 * It validates configuration properties. 222 * 223 * @param conf job configuration. 224 * @return the result of the dryrun 225 * @throws BaseEngineException thrown if there was a problem doing the dryrun 226 */ 227 public abstract String dryRunSubmit(Configuration conf) throws BaseEngineException; 228 229 230 /** 231 * Return the jms topic name for the job. 232 * 233 * @param jobId job Id. 234 * @return String the topic name 235 * @throws DagEngineException thrown if the jms info could not be obtained. 236 */ 237 public String getJMSTopicName(String jobId) throws DagEngineException { 238 JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class); 239 if (jmsTopicService != null) { 240 try { 241 return jmsTopicService.getTopic(jobId); 242 } 243 catch (JPAExecutorException e) { 244 throw new DagEngineException(ErrorCode.E1602, e); 245 } 246 } 247 else { 248 throw new DagEngineException(ErrorCode.E1602, 249 "JMSTopicService is not initialized. JMS notification" 250 + "may not be enabled"); 251 } 252 } 253 254 /** 255 * Return the status for a Job ID 256 * 257 * @param jobId job Id. 258 * @return the job's status 259 * @throws BaseEngineException thrown if the job's status could not be obtained 260 */ 261 public abstract String getJobStatus(String jobId) throws BaseEngineException; 262 263 /** 264 * Return the status for a Job ID 265 * 266 * @param jobId job Id. 267 * @return the job's status 268 * @throws BaseEngineException thrown if the job's status could not be obtained 269 */ 270 271 /** 272 * Enable SLA alert for job 273 * @param id 274 * @param actions 275 * @param dates 276 * @param childIds 277 * @throws BaseEngineException 278 */ 279 public abstract void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException; 280 281 /** 282 * Disable SLA alert for job 283 * @param id 284 * @param actions 285 * @param dates 286 * @param childIds 287 * @throws BaseEngineException 288 */ 289 public abstract void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException; 290 291 /** 292 * Change SLA properties for job 293 * @param id 294 * @param actions 295 * @param childIds 296 * @param newParams 297 * @throws BaseEngineException 298 */ 299 public abstract void changeSLA(String id, String actions, String dates, String childIds, String newParams) 300 throws BaseEngineException; 301 302 protected void fetchLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, 303 Map<String, String[]> params, LOG_TYPE logType) throws IOException { 304 305 switch (logType) { 306 case LOG: 307 Services.get().get(XLogStreamingService.class).streamLog(filter, startTime, endTime, writer, params); 308 break; 309 case ERROR_LOG: 310 Services.get().get(XLogStreamingService.class) 311 .streamErrorLog(filter, startTime, endTime, writer, params); 312 break; 313 case AUDIT_LOG: 314 Services.get().get(XLogStreamingService.class) 315 .streamAuditLog(filter, startTime, endTime, writer, params); 316 break; 317 default: 318 throw new IOException("Unsupported log Type"); 319 } 320 } 321 322 323}