001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie; 019 020 import java.io.IOException; 021 import java.io.Writer; 022 import java.util.ArrayList; 023 import java.util.HashMap; 024 import java.util.HashSet; 025 import java.util.List; 026 import java.util.Map; 027 import java.util.Properties; 028 import java.util.Set; 029 import java.util.StringTokenizer; 030 031 import org.apache.hadoop.conf.Configuration; 032 import org.apache.oozie.client.CoordinatorJob; 033 import org.apache.oozie.client.OozieClient; 034 import org.apache.oozie.client.WorkflowJob; 035 import org.apache.oozie.command.CommandException; 036 import org.apache.oozie.command.wf.CompletedActionCommand; 037 import org.apache.oozie.command.wf.DefinitionCommand; 038 import org.apache.oozie.command.wf.ExternalIdCommand; 039 import org.apache.oozie.command.wf.JobCommand; 040 import org.apache.oozie.command.wf.JobsCommand; 041 import org.apache.oozie.command.wf.KillCommand; 042 import org.apache.oozie.command.wf.ReRunCommand; 043 import org.apache.oozie.command.wf.ResumeCommand; 044 import org.apache.oozie.command.wf.StartCommand; 045 import org.apache.oozie.command.wf.SubmitCommand; 046 import org.apache.oozie.command.wf.SuspendCommand; 047 import org.apache.oozie.service.DagXLogInfoService; 048 import org.apache.oozie.service.Service; 049 import org.apache.oozie.service.Services; 050 import org.apache.oozie.service.XLogService; 051 import org.apache.oozie.util.ParamChecker; 052 import org.apache.oozie.util.XLog; 053 import org.apache.oozie.util.XLogStreamer; 054 055 public abstract class BaseEngine { 056 public static final String USE_XCOMMAND = "oozie.useXCommand"; 057 058 protected String user; 059 protected String authToken; 060 061 /** 062 * Return the user name. 063 * 064 * @return the user name. 065 */ 066 public String getUser() { 067 return user; 068 } 069 070 /** 071 * Return the authentication token. 072 * 073 * @return the authentication token. 074 */ 075 protected String getAuthToken() { 076 return authToken; 077 } 078 079 /** 080 * Submit a job. 081 * <p/> 082 * It validates configuration properties. 083 * 084 * @param conf job configuration. 085 * @param startJob indicates if the job should be started or not. 086 * @return the job Id. 087 * @throws BaseEngineException thrown if the job could not be created. 088 */ 089 public abstract String submitJob(Configuration conf, boolean startJob) throws BaseEngineException; 090 091 /** 092 * Start a job. 093 * 094 * @param jobId job Id. 095 * @throws BaseEngineException thrown if the job could not be started. 096 */ 097 public abstract void start(String jobId) throws BaseEngineException; 098 099 /** 100 * Resume a job. 101 * 102 * @param jobId job Id. 103 * @throws BaseEngineException thrown if the job could not be resumed. 104 */ 105 public abstract void resume(String jobId) throws BaseEngineException; 106 107 /** 108 * Suspend a job. 109 * 110 * @param jobId job Id. 111 * @throws BaseEngineException thrown if the job could not be suspended. 112 */ 113 public abstract void suspend(String jobId) throws BaseEngineException; 114 115 /** 116 * Kill a job. 117 * 118 * @param jobId job Id. 119 * @throws BaseEngineException thrown if the job could not be killed. 120 */ 121 public abstract void kill(String jobId) throws BaseEngineException; 122 123 /** 124 * Change a coordinator job. 125 * 126 * @param jobId job Id. 127 * @param changeValue change value. 128 * @throws BaseEngineException thrown if the job could not be changed. 129 */ 130 public abstract void change(String jobId, String changeValue) throws BaseEngineException; 131 132 /** 133 * Rerun a job. 134 * 135 * @param jobId job Id to rerun. 136 * @param conf configuration information for the rerun. 137 * @throws BaseEngineException thrown if the job could not be rerun. 138 */ 139 public abstract void reRun(String jobId, Configuration conf) throws BaseEngineException; 140 141 /** 142 * Return the info about a wf job. 143 * 144 * @param jobId job Id. 145 * @return the workflow job info. 146 * @throws DagEngineException thrown if the job info could not be obtained. 147 */ 148 public abstract WorkflowJob getJob(String jobId) throws BaseEngineException; 149 150 /** 151 * Return the info about a wf job with actions subset. 152 * 153 * @param jobId job Id 154 * @param start starting from this index in the list of actions belonging to the job 155 * @param length number of actions to be returned 156 * @return the workflow job info. 157 * @throws DagEngineException thrown if the job info could not be obtained. 158 */ 159 public abstract WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException; 160 161 /** 162 * Return the info about a coord job. 163 * 164 * @param jobId job Id. 165 * @return the coord job info. 166 * @throws BaseEngineException thrown if the job info could not be obtained. 167 */ 168 public abstract CoordinatorJob getCoordJob(String jobId) throws BaseEngineException; 169 170 /** 171 * Return the info about a coord job with actions subset. 172 * 173 * @param jobId job Id. 174 * @param start starting from this index in the list of actions belonging to the job 175 * @param length number of actions to be returned 176 * @return the coord job info. 177 * @throws BaseEngineException thrown if the job info could not be obtained. 178 */ 179 public abstract CoordinatorJob getCoordJob(String jobId, int start, int length) throws BaseEngineException; 180 181 /** 182 * Return the a job definition. 183 * 184 * @param jobId job Id. 185 * @return the job definition. 186 * @throws BaseEngineException thrown if the job definition could no be obtained. 187 */ 188 public abstract String getDefinition(String jobId) throws BaseEngineException; 189 190 /** 191 * Stream the log of a job. 192 * 193 * @param jobId job Id. 194 * @param writer writer to stream the log to. 195 * @throws IOException thrown if the log cannot be streamed. 196 * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for 197 * jobId. 198 */ 199 public abstract void streamLog(String jobId, Writer writer) throws IOException, BaseEngineException; 200 201 /** 202 * Return the workflow Job ID for an external ID. 203 * <p/> 204 * This is reverse lookup for recovery purposes. 205 * 206 * @param externalId external ID provided at job submission time. 207 * @return the associated workflow job ID if any, <code>null</code> if none. 208 * @throws BaseEngineException thrown if the lookup could not be done. 209 */ 210 public abstract String getJobIdForExternalId(String externalId) throws BaseEngineException; 211 212 public abstract String dryrunSubmit(Configuration conf, boolean startJob) throws BaseEngineException; 213 214 }