001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie; 019 020import java.io.IOException; 021import java.io.Writer; 022import java.util.Map; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.oozie.client.CoordinatorJob; 026import org.apache.oozie.client.WorkflowJob; 027import org.apache.oozie.executor.jpa.JPAExecutorException; 028import org.apache.oozie.service.JMSTopicService; 029import org.apache.oozie.service.Services; 030 031public abstract class BaseEngine { 032 public static final String USE_XCOMMAND = "oozie.useXCommand"; 033 034 protected String user; 035 036 /** 037 * Return the user name. 038 * 039 * @return the user name. 040 */ 041 public String getUser() { 042 return user; 043 } 044 045 /** 046 * Submit a job. 047 * <p/> 048 * It validates configuration properties. 049 * 050 * @param conf job configuration. 051 * @param startJob indicates if the job should be started or not. 052 * @return the job Id. 053 * @throws BaseEngineException thrown if the job could not be created. 054 */ 055 public abstract String submitJob(Configuration conf, boolean startJob) throws BaseEngineException; 056 057 /** 058 * Start a job. 059 * 060 * @param jobId job Id. 061 * @throws BaseEngineException thrown if the job could not be started. 062 */ 063 public abstract void start(String jobId) throws BaseEngineException; 064 065 /** 066 * Resume a job. 067 * 068 * @param jobId job Id. 069 * @throws BaseEngineException thrown if the job could not be resumed. 070 */ 071 public abstract void resume(String jobId) throws BaseEngineException; 072 073 /** 074 * Suspend a job. 075 * 076 * @param jobId job Id. 077 * @throws BaseEngineException thrown if the job could not be suspended. 078 */ 079 public abstract void suspend(String jobId) throws BaseEngineException; 080 081 /** 082 * Kill a job. 083 * 084 * @param jobId job Id. 085 * @throws BaseEngineException thrown if the job could not be killed. 086 */ 087 public abstract void kill(String jobId) throws BaseEngineException; 088 089 /** 090 * Change a coordinator job. 091 * 092 * @param jobId job Id. 093 * @param changeValue change value. 094 * @throws BaseEngineException thrown if the job could not be changed. 095 */ 096 public abstract void change(String jobId, String changeValue) throws BaseEngineException; 097 098 /** 099 * Rerun a job. 100 * 101 * @param jobId job Id to rerun. 102 * @param conf configuration information for the rerun. 103 * @throws BaseEngineException thrown if the job could not be rerun. 104 */ 105 public abstract void reRun(String jobId, Configuration conf) throws BaseEngineException; 106 107 /** 108 * Return the info about a wf job. 109 * 110 * @param jobId job Id. 111 * @return the workflow job info. 112 * @throws DagEngineException thrown if the job info could not be obtained. 113 */ 114 public abstract WorkflowJob getJob(String jobId) throws BaseEngineException; 115 116 /** 117 * Return the info about a wf job with actions subset. 118 * 119 * @param jobId job Id 120 * @param start starting from this index in the list of actions belonging to the job 121 * @param length number of actions to be returned 122 * @return the workflow job info. 123 * @throws DagEngineException thrown if the job info could not be obtained. 124 */ 125 public abstract WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException; 126 127 /** 128 * Return the info about a coord job. 129 * 130 * @param jobId job Id. 131 * @return the coord job info. 132 * @throws BaseEngineException thrown if the job info could not be obtained. 133 */ 134 public abstract CoordinatorJob getCoordJob(String jobId) throws BaseEngineException; 135 136 /** 137 * Return the info about a coord job with actions subset. 138 * 139 * @param jobId job Id. 140 * @param filter the status filter 141 * @param start starting from this index in the list of actions belonging to the job 142 * @param length number of actions to be returned 143 * @param order true if actions are sorted in a descending order of nominal time, false if asc order 144 * @return the coord job info. 145 * @throws BaseEngineException thrown if the job info could not be obtained. 146 */ 147 public abstract CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc) 148 throws BaseEngineException; 149 150 /** 151 * Return the a job definition. 152 * 153 * @param jobId job Id. 154 * @return the job definition. 155 * @throws BaseEngineException thrown if the job definition could no be obtained. 156 */ 157 public abstract String getDefinition(String jobId) throws BaseEngineException; 158 159 /** 160 * Stream the log of a job. 161 * 162 * @param jobId job Id. 163 * @param writer writer to stream the log to. 164 * @param params additional parameters from the request 165 * @throws IOException thrown if the log cannot be streamed. 166 * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for 167 * jobId. 168 */ 169 public abstract void streamLog(String jobId, Writer writer, Map<String, String[]> params) 170 throws IOException, BaseEngineException; 171 172 /** 173 * Return the workflow Job ID for an external ID. 174 * <p/> 175 * This is reverse lookup for recovery purposes. 176 * 177 * @param externalId external ID provided at job submission time. 178 * @return the associated workflow job ID if any, <code>null</code> if none. 179 * @throws BaseEngineException thrown if the lookup could not be done. 180 */ 181 public abstract String getJobIdForExternalId(String externalId) throws BaseEngineException; 182 183 /** 184 * Dry run a job; like {@link BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean) but doesn't actually execute 185 * the job. 186 * <p/> 187 * It validates configuration properties. 188 * 189 * @param conf job configuration. 190 * @return the result of the dryrun 191 * @throws BaseEngineException thrown if there was a problem doing the dryrun 192 */ 193 public abstract String dryRunSubmit(Configuration conf) throws BaseEngineException; 194 195 196 /** 197 * Return the jms topic name for the job. 198 * 199 * @param jobId job Id. 200 * @return String the topic name 201 * @throws DagEngineException thrown if the jms info could not be obtained. 202 */ 203 public String getJMSTopicName(String jobId) throws DagEngineException { 204 JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class); 205 if (jmsTopicService != null) { 206 try { 207 return jmsTopicService.getTopic(jobId); 208 } 209 catch (JPAExecutorException e) { 210 throw new DagEngineException(ErrorCode.E1602, e); 211 } 212 } 213 else { 214 throw new DagEngineException(ErrorCode.E1602, 215 "JMSTopicService is not initialized. JMS notification" 216 + "may not be enabled"); 217 } 218 } 219 220}