001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie;
019    
020    import java.io.IOException;
021    import java.io.Writer;
022    import java.util.ArrayList;
023    import java.util.HashMap;
024    import java.util.HashSet;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.Properties;
028    import java.util.Set;
029    import java.util.StringTokenizer;
030    
031    import org.apache.hadoop.conf.Configuration;
032    import org.apache.oozie.client.CoordinatorJob;
033    import org.apache.oozie.client.OozieClient;
034    import org.apache.oozie.client.WorkflowJob;
035    import org.apache.oozie.command.CommandException;
036    import org.apache.oozie.command.wf.CompletedActionCommand;
037    import org.apache.oozie.command.wf.DefinitionCommand;
038    import org.apache.oozie.command.wf.ExternalIdCommand;
039    import org.apache.oozie.command.wf.JobCommand;
040    import org.apache.oozie.command.wf.JobsCommand;
041    import org.apache.oozie.command.wf.KillCommand;
042    import org.apache.oozie.command.wf.ReRunCommand;
043    import org.apache.oozie.command.wf.ResumeCommand;
044    import org.apache.oozie.command.wf.StartCommand;
045    import org.apache.oozie.command.wf.SubmitCommand;
046    import org.apache.oozie.command.wf.SuspendCommand;
047    import org.apache.oozie.service.DagXLogInfoService;
048    import org.apache.oozie.service.Service;
049    import org.apache.oozie.service.Services;
050    import org.apache.oozie.service.XLogService;
051    import org.apache.oozie.util.ParamChecker;
052    import org.apache.oozie.util.XLog;
053    import org.apache.oozie.util.XLogStreamer;
054    
055    public abstract class BaseEngine {
056        public static final String USE_XCOMMAND = "oozie.useXCommand";
057    
058        protected String user;
059        protected String authToken;
060    
061        /**
062         * Return the user name.
063         *
064         * @return the user name.
065         */
066        public String getUser() {
067            return user;
068        }
069    
070        /**
071         * Return the authentication token.
072         *
073         * @return the authentication token.
074         */
075        protected String getAuthToken() {
076            return authToken;
077        }
078    
079        /**
080         * Submit a job.
081         * <p/>
082         * It validates configuration properties.
083         *
084         * @param conf job configuration.
085         * @param startJob indicates if the job should be started or not.
086         * @return the job Id.
087         * @throws BaseEngineException thrown if the job could not be created.
088         */
089        public abstract String submitJob(Configuration conf, boolean startJob) throws BaseEngineException;
090    
091        /**
092         * Start a job.
093         *
094         * @param jobId job Id.
095         * @throws BaseEngineException thrown if the job could not be started.
096         */
097        public abstract void start(String jobId) throws BaseEngineException;
098    
099        /**
100         * Resume a job.
101         *
102         * @param jobId job Id.
103         * @throws BaseEngineException thrown if the job could not be resumed.
104         */
105        public abstract void resume(String jobId) throws BaseEngineException;
106    
107        /**
108         * Suspend a job.
109         *
110         * @param jobId job Id.
111         * @throws BaseEngineException thrown if the job could not be suspended.
112         */
113        public abstract void suspend(String jobId) throws BaseEngineException;
114    
115        /**
116         * Kill a job.
117         *
118         * @param jobId job Id.
119         * @throws BaseEngineException thrown if the job could not be killed.
120         */
121        public abstract void kill(String jobId) throws BaseEngineException;
122    
123        /**
124         * Change a coordinator job.
125         *
126         * @param jobId job Id.
127         * @param changeValue change value.
128         * @throws BaseEngineException thrown if the job could not be changed.
129         */
130        public abstract void change(String jobId, String changeValue) throws BaseEngineException;
131    
132        /**
133         * Rerun a job.
134         *
135         * @param jobId job Id to rerun.
136         * @param conf configuration information for the rerun.
137         * @throws BaseEngineException thrown if the job could not be rerun.
138         */
139        public abstract void reRun(String jobId, Configuration conf) throws BaseEngineException;
140    
141        /**
142         * Return the info about a wf job.
143         *
144         * @param jobId job Id.
145         * @return the workflow job info.
146         * @throws DagEngineException thrown if the job info could not be obtained.
147         */
148        public abstract WorkflowJob getJob(String jobId) throws BaseEngineException;
149    
150        /**
151         * Return the info about a wf job with actions subset.
152         *
153         * @param jobId job Id
154         * @param start starting from this index in the list of actions belonging to the job
155         * @param length number of actions to be returned
156         * @return the workflow job info.
157         * @throws DagEngineException thrown if the job info could not be obtained.
158         */
159        public abstract WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException;
160    
161        /**
162         * Return the info about a coord job.
163         *
164         * @param jobId job Id.
165         * @return the coord job info.
166         * @throws BaseEngineException thrown if the job info could not be obtained.
167         */
168        public abstract CoordinatorJob getCoordJob(String jobId) throws BaseEngineException;
169    
170        /**
171         * Return the info about a coord job with actions subset.
172         *
173         * @param jobId job Id.
174         * @param start starting from this index in the list of actions belonging to the job
175         * @param length number of actions to be returned
176         * @return the coord job info.
177         * @throws BaseEngineException thrown if the job info could not be obtained.
178         */
179        public abstract CoordinatorJob getCoordJob(String jobId, int start, int length) throws BaseEngineException;
180    
181        /**
182         * Return the a job definition.
183         *
184         * @param jobId job Id.
185         * @return the job definition.
186         * @throws BaseEngineException thrown if the job definition could no be obtained.
187         */
188        public abstract String getDefinition(String jobId) throws BaseEngineException;
189    
190        /**
191         * Stream the log of a job.
192         *
193         * @param jobId job Id.
194         * @param writer writer to stream the log to.
195         * @throws IOException thrown if the log cannot be streamed.
196         * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for
197         *         jobId.
198         */
199        public abstract void streamLog(String jobId, Writer writer) throws IOException, BaseEngineException;
200    
201        /**
202         * Return the workflow Job ID for an external ID.
203         * <p/>
204         * This is reverse lookup for recovery purposes.
205         *
206         * @param externalId external ID provided at job submission time.
207         * @return the associated workflow job ID if any, <code>null</code> if none.
208         * @throws BaseEngineException thrown if the lookup could not be done.
209         */
210        public abstract String getJobIdForExternalId(String externalId) throws BaseEngineException;
211    
212        public abstract String dryrunSubmit(Configuration conf, boolean startJob) throws BaseEngineException;
213    
214    }