001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie;
019    
020    import java.io.IOException;
021    import java.io.Writer;
022    
023    import org.apache.hadoop.conf.Configuration;
024    import org.apache.oozie.client.CoordinatorJob;
025    import org.apache.oozie.client.WorkflowJob;
026    import org.apache.oozie.executor.jpa.JPAExecutorException;
027    import org.apache.oozie.service.JMSTopicService;
028    import org.apache.oozie.service.Services;
029    
030    public abstract class BaseEngine {
031        public static final String USE_XCOMMAND = "oozie.useXCommand";
032    
033        protected String user;
034    
035        /**
036         * Return the user name.
037         *
038         * @return the user name.
039         */
040        public String getUser() {
041            return user;
042        }
043    
044        /**
045         * Submit a job.
046         * <p/>
047         * It validates configuration properties.
048         *
049         * @param conf job configuration.
050         * @param startJob indicates if the job should be started or not.
051         * @return the job Id.
052         * @throws BaseEngineException thrown if the job could not be created.
053         */
054        public abstract String submitJob(Configuration conf, boolean startJob) throws BaseEngineException;
055    
056        /**
057         * Start a job.
058         *
059         * @param jobId job Id.
060         * @throws BaseEngineException thrown if the job could not be started.
061         */
062        public abstract void start(String jobId) throws BaseEngineException;
063    
064        /**
065         * Resume a job.
066         *
067         * @param jobId job Id.
068         * @throws BaseEngineException thrown if the job could not be resumed.
069         */
070        public abstract void resume(String jobId) throws BaseEngineException;
071    
072        /**
073         * Suspend a job.
074         *
075         * @param jobId job Id.
076         * @throws BaseEngineException thrown if the job could not be suspended.
077         */
078        public abstract void suspend(String jobId) throws BaseEngineException;
079    
080        /**
081         * Kill a job.
082         *
083         * @param jobId job Id.
084         * @throws BaseEngineException thrown if the job could not be killed.
085         */
086        public abstract void kill(String jobId) throws BaseEngineException;
087    
088        /**
089         * Change a coordinator job.
090         *
091         * @param jobId job Id.
092         * @param changeValue change value.
093         * @throws BaseEngineException thrown if the job could not be changed.
094         */
095        public abstract void change(String jobId, String changeValue) throws BaseEngineException;
096    
097        /**
098         * Rerun a job.
099         *
100         * @param jobId job Id to rerun.
101         * @param conf configuration information for the rerun.
102         * @throws BaseEngineException thrown if the job could not be rerun.
103         */
104        public abstract void reRun(String jobId, Configuration conf) throws BaseEngineException;
105    
106        /**
107         * Return the info about a wf job.
108         *
109         * @param jobId job Id.
110         * @return the workflow job info.
111         * @throws DagEngineException thrown if the job info could not be obtained.
112         */
113        public abstract WorkflowJob getJob(String jobId) throws BaseEngineException;
114    
115        /**
116         * Return the info about a wf job with actions subset.
117         *
118         * @param jobId job Id
119         * @param start starting from this index in the list of actions belonging to the job
120         * @param length number of actions to be returned
121         * @return the workflow job info.
122         * @throws DagEngineException thrown if the job info could not be obtained.
123         */
124        public abstract WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException;
125    
126        /**
127         * Return the info about a coord job.
128         *
129         * @param jobId job Id.
130         * @return the coord job info.
131         * @throws BaseEngineException thrown if the job info could not be obtained.
132         */
133        public abstract CoordinatorJob getCoordJob(String jobId) throws BaseEngineException;
134    
135        /**
136         * Return the info about a coord job with actions subset.
137         *
138         * @param jobId job Id.
139         * @param filter the status filter
140         * @param start starting from this index in the list of actions belonging to the job
141         * @param length number of actions to be returned
142         * @param order true if actions are sorted in a descending order of nominal time, false if asc order
143         * @return the coord job info.
144         * @throws BaseEngineException thrown if the job info could not be obtained.
145         */
146        public abstract CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc)
147                throws BaseEngineException;
148    
149        /**
150         * Return the a job definition.
151         *
152         * @param jobId job Id.
153         * @return the job definition.
154         * @throws BaseEngineException thrown if the job definition could no be obtained.
155         */
156        public abstract String getDefinition(String jobId) throws BaseEngineException;
157    
158        /**
159         * Stream the log of a job.
160         *
161         * @param jobId job Id.
162         * @param writer writer to stream the log to.
163         * @throws IOException thrown if the log cannot be streamed.
164         * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for
165         *         jobId.
166         */
167        public abstract void streamLog(String jobId, Writer writer) throws IOException, BaseEngineException;
168    
169        /**
170         * Return the workflow Job ID for an external ID.
171         * <p/>
172         * This is reverse lookup for recovery purposes.
173         *
174         * @param externalId external ID provided at job submission time.
175         * @return the associated workflow job ID if any, <code>null</code> if none.
176         * @throws BaseEngineException thrown if the lookup could not be done.
177         */
178        public abstract String getJobIdForExternalId(String externalId) throws BaseEngineException;
179    
180        /**
181         * Dry run a job; like {@link BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean) but doesn't actually execute
182         * the job.
183         * <p/>
184         * It validates configuration properties.
185         *
186         * @param conf job configuration.
187         * @return the result of the dryrun
188         * @throws BaseEngineException thrown if there was a problem doing the dryrun
189         */
190        public abstract String dryRunSubmit(Configuration conf) throws BaseEngineException;
191    
192    
193        /**
194         * Return the jms topic name for the job.
195         *
196         * @param jobId job Id.
197         * @return String the topic name
198         * @throws DagEngineException thrown if the jms info could not be obtained.
199         */
200        public String getJMSTopicName(String jobId) throws DagEngineException {
201            JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
202            if (jmsTopicService != null) {
203                try {
204                    return jmsTopicService.getTopic(jobId);
205                }
206                catch (JPAExecutorException e) {
207                   throw new DagEngineException(ErrorCode.E1602, e);
208                }
209            }
210            else {
211                throw new DagEngineException(ErrorCode.E1602,
212                        "JMSTopicService is not initialized. JMS notification"
213                                + "may not be enabled");
214            }
215        }
216    
217    }