001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie; 020 021import java.io.IOException; 022import java.io.Writer; 023import java.util.Map; 024 025import org.apache.hadoop.conf.Configuration; 026import org.apache.oozie.client.CoordinatorJob; 027import org.apache.oozie.client.OozieClientException; 028import org.apache.oozie.client.WorkflowJob; 029import org.apache.oozie.command.CommandException; 030import org.apache.oozie.executor.jpa.JPAExecutorException; 031import org.apache.oozie.service.JMSTopicService; 032import org.apache.oozie.service.Services; 033import org.apache.oozie.util.XLogAuditStreamer; 034import org.apache.oozie.util.XLogErrorStreamer; 035import org.apache.oozie.util.XLogStreamer; 036 037public abstract class BaseEngine { 038 public static final String USE_XCOMMAND = "oozie.useXCommand"; 039 040 protected String user; 041 042 /** 043 * Return the user name. 044 * 045 * @return the user name. 046 */ 047 public String getUser() { 048 return user; 049 } 050 051 /** 052 * Submit a job. 053 * <p> 054 * It validates configuration properties. 055 * 056 * @param conf job configuration. 057 * @param startJob indicates if the job should be started or not. 058 * @return the job Id. 059 * @throws BaseEngineException thrown if the job could not be created. 060 */ 061 public abstract String submitJob(Configuration conf, boolean startJob) throws BaseEngineException; 062 063 /** 064 * Start a job. 065 * 066 * @param jobId job Id. 067 * @throws BaseEngineException thrown if the job could not be started. 068 */ 069 public abstract void start(String jobId) throws BaseEngineException; 070 071 /** 072 * Resume a job. 073 * 074 * @param jobId job Id. 075 * @throws BaseEngineException thrown if the job could not be resumed. 076 */ 077 public abstract void resume(String jobId) throws BaseEngineException; 078 079 /** 080 * Suspend a job. 081 * 082 * @param jobId job Id. 083 * @throws BaseEngineException thrown if the job could not be suspended. 084 */ 085 public abstract void suspend(String jobId) throws BaseEngineException; 086 087 /** 088 * Kill a job. 089 * 090 * @param jobId job Id. 091 * @throws BaseEngineException thrown if the job could not be killed. 092 */ 093 public abstract void kill(String jobId) throws BaseEngineException; 094 095 /** 096 * Change a coordinator job. 097 * 098 * @param jobId job Id. 099 * @param changeValue change value. 100 * @throws BaseEngineException thrown if the job could not be changed. 101 */ 102 public abstract void change(String jobId, String changeValue) throws BaseEngineException; 103 104 /** 105 * Rerun a job. 106 * 107 * @param jobId job Id to rerun. 108 * @param conf configuration information for the rerun. 109 * @throws BaseEngineException thrown if the job could not be rerun. 110 */ 111 public abstract void reRun(String jobId, Configuration conf) throws BaseEngineException; 112 113 /** 114 * Return the info about a wf job. 115 * 116 * @param jobId job Id. 117 * @return the workflow job info. 118 * @throws DagEngineException thrown if the job info could not be obtained. 119 */ 120 public abstract WorkflowJob getJob(String jobId) throws BaseEngineException; 121 122 /** 123 * Return the info about a wf job with actions subset. 124 * 125 * @param jobId job Id 126 * @param start starting from this index in the list of actions belonging to the job 127 * @param length number of actions to be returned 128 * @return the workflow job info. 129 * @throws DagEngineException thrown if the job info could not be obtained. 130 */ 131 public abstract WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException; 132 133 /** 134 * Return the info about a coord job. 135 * 136 * @param jobId job Id. 137 * @return the coord job info. 138 * @throws BaseEngineException thrown if the job info could not be obtained. 139 */ 140 public abstract CoordinatorJob getCoordJob(String jobId) throws BaseEngineException; 141 142 /** 143 * Return the info about a coord job with actions subset. 144 * 145 * @param jobId job Id. 146 * @param filter the status filter 147 * @param start starting from this index in the list of actions belonging to the job 148 * @param length number of actions to be returned 149 * @param desc true if actions are sorted in a descending order of nominal time, false if asc order 150 * @return the coord job info. 151 * @throws BaseEngineException thrown if the job info could not be obtained. 152 */ 153 public abstract CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc) 154 throws BaseEngineException; 155 156 /** 157 * Return the a job definition. 158 * 159 * @param jobId job Id. 160 * @return the job definition. 161 * @throws BaseEngineException thrown if the job definition could no be obtained. 162 */ 163 public abstract String getDefinition(String jobId) throws BaseEngineException; 164 165 /** 166 * Stream the log of a job. 167 * 168 * @param jobId job Id. 169 * @param writer writer to stream the log to. 170 * @param requestParameters additional parameters from the request 171 * @throws IOException thrown if the log cannot be streamed. 172 * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for 173 * jobId. 174 */ 175 public void streamLog(String jobId, Writer writer, Map<String, String[]> requestParameters) throws IOException, 176 BaseEngineException { 177 try { 178 179 streamJobLog(new XLogStreamer(requestParameters), jobId, writer); 180 } 181 catch (CommandException e) { 182 throw new IOException(e); 183 } 184 } 185 186 /** 187 * Stream error log of a job. 188 * 189 * @param jobId job Id. 190 * @param writer writer to stream the log to. 191 * @param requestParameters additional parameters from the request 192 * @throws IOException thrown if the log cannot be streamed. 193 * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for 194 * jobId. 195 */ 196 public void streamErrorLog(String jobId, Writer writer, Map<String, String[]> requestParameters) throws IOException, 197 BaseEngineException { 198 try { 199 200 streamJobLog(new XLogErrorStreamer(requestParameters), jobId, writer); 201 } 202 catch (CommandException e) { 203 throw new IOException(e); 204 } 205 } /** 206 * Stream Audit log of a job. 207 * 208 * @param jobId job Id. 209 * @param writer writer to stream the log to. 210 * @param requestParameters additional parameters from the request 211 * @throws IOException thrown if the log cannot be streamed. 212 * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for 213 * jobId. 214 */ 215 public void streamAuditLog(String jobId, Writer writer, Map<String, String[]> requestParameters) throws IOException, 216 BaseEngineException { 217 try { 218 streamJobLog(new XLogAuditStreamer(requestParameters), jobId, writer); 219 } 220 catch (CommandException e) { 221 throw new IOException(e); 222 } 223 } 224 225 /** 226 * Return the workflow Job ID for an external ID. 227 * <p> 228 * This is reverse lookup for recovery purposes. 229 * 230 * @param externalId external ID provided at job submission time. 231 * @return the associated workflow job ID if any, <code>null</code> if none. 232 * @throws BaseEngineException thrown if the lookup could not be done. 233 */ 234 public abstract String getJobIdForExternalId(String externalId) throws BaseEngineException; 235 236 /** 237 * Dry run a job; like {@link BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean)} but doesn't actually execute 238 * the job. 239 * <p> 240 * It validates configuration properties. 241 * 242 * @param conf job configuration. 243 * @return the result of the dryrun 244 * @throws BaseEngineException thrown if there was a problem doing the dryrun 245 */ 246 public abstract String dryRunSubmit(Configuration conf) throws BaseEngineException; 247 248 249 /** 250 * Return the jms topic name for the job. 251 * 252 * @param jobId job Id. 253 * @return String the topic name 254 * @throws DagEngineException thrown if the jms info could not be obtained. 255 */ 256 public String getJMSTopicName(String jobId) throws DagEngineException { 257 JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class); 258 if (jmsTopicService != null) { 259 try { 260 return jmsTopicService.getTopic(jobId); 261 } 262 catch (JPAExecutorException e) { 263 throw new DagEngineException(ErrorCode.E1602, e); 264 } 265 } 266 else { 267 throw new DagEngineException(ErrorCode.E1602, 268 "JMSTopicService is not initialized. JMS notification" 269 + "may not be enabled"); 270 } 271 } 272 273 /** 274 * Return the status for a Job ID 275 * 276 * @param jobId job Id. 277 * @return the job's status 278 * @throws BaseEngineException thrown if the job's status could not be obtained 279 */ 280 public abstract String getJobStatus(String jobId) throws BaseEngineException; 281 282 /** 283 * Enable SLA alert for job 284 * @param id job ID 285 * @param actions list of actions 286 * @param dates dates 287 * @param childIds child IDs 288 * @throws BaseEngineException thrown if SLA alert could not be enabled 289 */ 290 public abstract void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException; 291 292 /** 293 * Disable SLA alert for job 294 * @param id job ID 295 * @param actions list of actions 296 * @param dates dates 297 * @param childIds child IDs 298 * @throws BaseEngineException thrown if SLA alert could not be disabled 299 */ 300 public abstract void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException; 301 302 /** 303 * Change SLA properties for job 304 * @param id job ID 305 * @param actions list of actions 306 * @param dates dates 307 * @param childIds child IDs 308 * @param newParams parameters to 309 * @throws BaseEngineException thrown if SLA alert could not be enabled 310 */ 311 public abstract void changeSLA(String id, String actions, String dates, String childIds, String newParams) 312 throws BaseEngineException; 313 314 /** 315 * Stream job log. 316 * 317 * @param logStreamer the log streamer 318 * @param jobId the job id 319 * @param writer the writer 320 * @throws IOException Signals that an I/O exception has occurred. 321 * @throws BaseEngineException the base engine exception 322 */ 323 protected abstract void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer) throws IOException, 324 BaseEngineException; 325 326 interface BaseEngineCallable<V> { 327 V callOrThrow() throws BaseEngineException; 328 } 329 330 static <V> V callOrRethrow(final BaseEngineCallable<V> callable) throws OozieClientException { 331 try { 332 return callable.callOrThrow(); 333 } catch (final BaseEngineException e) { 334 throw new OozieClientException(e.getErrorCode().toString(), e); 335 } 336 } 337 338 static <V> V throwNoOp() throws OozieClientException { 339 throw new OozieClientException(ErrorCode.E0301.toString(), "no-op"); 340 } 341}