001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.servlet;
019    
020    import java.io.IOException;
021    import java.util.List;
022    
023    import javax.servlet.http.HttpServletRequest;
024    import javax.servlet.http.HttpServletResponse;
025    
026    import org.apache.hadoop.conf.Configuration;
027    import org.apache.oozie.BundleJobBean;
028    import org.apache.oozie.BundleJobInfo;
029    import org.apache.oozie.CoordinatorEngine;
030    import org.apache.oozie.BundleEngine;
031    import org.apache.oozie.CoordinatorEngineException;
032    import org.apache.oozie.BundleEngineException;
033    import org.apache.oozie.CoordinatorJobBean;
034    import org.apache.oozie.CoordinatorJobInfo;
035    import org.apache.oozie.DagEngine;
036    import org.apache.oozie.DagEngineException;
037    import org.apache.oozie.ErrorCode;
038    import org.apache.oozie.WorkflowJobBean;
039    import org.apache.oozie.WorkflowsInfo;
040    import org.apache.oozie.client.OozieClient;
041    import org.apache.oozie.client.rest.JsonTags;
042    import org.apache.oozie.client.rest.RestConstants;
043    import org.apache.oozie.service.CoordinatorEngineService;
044    import org.apache.oozie.service.DagEngineService;
045    import org.apache.oozie.service.BundleEngineService;
046    import org.apache.oozie.service.Services;
047    import org.apache.oozie.util.XLog;
048    import org.apache.oozie.util.XmlUtils;
049    import org.json.simple.JSONObject;
050    
051    public class V1JobsServlet extends BaseJobsServlet {
052    
053        private static final String INSTRUMENTATION_NAME = "v1jobs";
054    
055        public V1JobsServlet() {
056            super(INSTRUMENTATION_NAME);
057        }
058    
059        /**
060         * v1 service implementation to submit a job, either workflow or coordinator
061         */
062        @Override
063        protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException,
064                IOException {
065            JSONObject json = null;
066    
067            String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM);
068    
069            if (jobType == null) {
070                String wfPath = conf.get(OozieClient.APP_PATH);
071                String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH);
072                String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
073    
074                ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath);
075    
076                if (wfPath != null) {
077                    json = submitWorkflowJob(request, conf);
078                }
079                else if (coordPath != null) {
080                    json = submitCoordinatorJob(request, conf);
081                }
082                else {
083                    json = submitBundleJob(request, conf);
084                }
085            }
086            else { // This is a http submission job
087                if (jobType.equals("pig") || jobType.equals("mapreduce")) {
088                    json = submitHttpJob(request, conf, jobType);
089                }
090                else {
091                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
092                            RestConstants.JOBTYPE_PARAM, jobType);
093                }
094            }
095            return json;
096        }
097    
098        /**
099         * v1 service implementation to get a JSONObject representation of a job from its external ID
100         */
101        @Override
102        protected JSONObject getJobIdForExternalId(HttpServletRequest request, String externalId) throws XServletException,
103                IOException {
104            JSONObject json = null;
105            /*
106             * Configuration conf = new XConfiguration(); String wfPath =
107             * conf.get(OozieClient.APP_PATH); String coordPath =
108             * conf.get(OozieClient.COORDINATOR_APP_PATH);
109             *
110             * ServletUtilities.ValidateAppPath(wfPath, coordPath);
111             */
112            String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
113            jobtype = (jobtype != null) ? jobtype : "wf";
114            if (jobtype.contains("wf")) {
115                json = getWorkflowJobIdForExternalId(request, externalId);
116            }
117            else {
118                json = getCoordinatorJobIdForExternalId(request, externalId);
119            }
120            return json;
121        }
122    
123        /**
124         * v1 service implementation to get a list of workflows, coordinators, or bundles, with filtering or interested
125         * windows embedded in the request object
126         */
127        @Override
128        protected JSONObject getJobs(HttpServletRequest request) throws XServletException, IOException {
129            JSONObject json = null;
130            String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
131            jobtype = (jobtype != null) ? jobtype : "wf";
132    
133            if (jobtype.contains("wf")) {
134                json = getWorkflowJobs(request);
135            }
136            else if (jobtype.contains("coord")) {
137                json = getCoordinatorJobs(request);
138            }
139            else if (jobtype.contains("bundle")) {
140                json = getBundleJobs(request);
141            }
142            return json;
143        }
144    
145        /**
146         * v1 service implementation to submit a workflow job
147         */
148        @SuppressWarnings("unchecked")
149        private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException {
150    
151            JSONObject json = new JSONObject();
152    
153            try {
154                String action = request.getParameter(RestConstants.ACTION_PARAM);
155                if (action != null && !action.equals(RestConstants.JOB_ACTION_START)) {
156                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
157                            RestConstants.ACTION_PARAM, action);
158                }
159                boolean startJob = (action != null);
160                String user = conf.get(OozieClient.USER_NAME);
161                DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, getAuthToken(request));
162                String id = dagEngine.submitJob(conf, startJob);
163                json.put(JsonTags.JOB_ID, id);
164            }
165            catch (DagEngineException ex) {
166                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
167            }
168    
169            return json;
170        }
171    
172        /**
173         * v1 service implementation to submit a coordinator job
174         */
175        @SuppressWarnings("unchecked")
176        private JSONObject submitCoordinatorJob(HttpServletRequest request, Configuration conf) throws XServletException {
177    
178            JSONObject json = new JSONObject();
179            XLog.getLog(getClass()).warn("submitCoordinatorJob " + XmlUtils.prettyPrint(conf).toString());
180            try {
181                String action = request.getParameter(RestConstants.ACTION_PARAM);
182                if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
183                        && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
184                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
185                            RestConstants.ACTION_PARAM, action);
186                }
187                boolean startJob = (action != null);
188                String user = conf.get(OozieClient.USER_NAME);
189                CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
190                        user, getAuthToken(request));
191                String id = null;
192                boolean dryrun = false;
193                if (action != null) {
194                    dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
195                }
196                if (dryrun) {
197                    id = coordEngine.dryrunSubmit(conf, startJob);
198                }
199                else {
200                    id = coordEngine.submitJob(conf, startJob);
201                }
202                json.put(JsonTags.JOB_ID, id);
203            }
204            catch (CoordinatorEngineException ex) {
205                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
206            }
207    
208            return json;
209        }
210    
211        /**
212         * v1 service implementation to submit a bundle job
213         */
214        @SuppressWarnings("unchecked")
215        private JSONObject submitBundleJob(HttpServletRequest request, Configuration conf) throws XServletException {
216            JSONObject json = new JSONObject();
217            XLog.getLog(getClass()).warn("submitBundleJob " + XmlUtils.prettyPrint(conf).toString());
218            try {
219                String action = request.getParameter(RestConstants.ACTION_PARAM);
220                if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
221                        && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
222                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
223                            RestConstants.ACTION_PARAM, action);
224                }
225                boolean startJob = (action != null);
226                String user = conf.get(OozieClient.USER_NAME);
227                BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(user,
228                        getAuthToken(request));
229                String id = null;
230                boolean dryrun = false;
231                if (action != null) {
232                    dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
233                }
234                if (dryrun) {
235                    id = bundleEngine.dryrunSubmit(conf, startJob);
236                }
237                else {
238                    id = bundleEngine.submitJob(conf, startJob);
239                }
240                json.put(JsonTags.JOB_ID, id);
241            }
242            catch (BundleEngineException ex) {
243                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
244            }
245    
246            return json;
247        }
248    
249        /**
250         * v1 service implementation to get a JSONObject representation of a job from its external ID
251         */
252        @SuppressWarnings("unchecked")
253        private JSONObject getWorkflowJobIdForExternalId(HttpServletRequest request, String externalId)
254                throws XServletException {
255            JSONObject json = new JSONObject();
256            try {
257                DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
258                        getAuthToken(request));
259                String jobId = dagEngine.getJobIdForExternalId(externalId);
260                json.put(JsonTags.JOB_ID, jobId);
261            }
262            catch (DagEngineException ex) {
263                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
264            }
265            return json;
266        }
267    
268        /**
269         * v1 service implementation to get a JSONObject representation of a job from its external ID
270         */
271        private JSONObject getCoordinatorJobIdForExternalId(HttpServletRequest request, String externalId)
272                throws XServletException {
273            JSONObject json = new JSONObject();
274            return json;
275        }
276    
277        /**
278         * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the
279         * request object
280         */
281        private JSONObject getWorkflowJobs(HttpServletRequest request) throws XServletException {
282            JSONObject json = new JSONObject();
283            try {
284                String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
285                String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
286                String lenStr = request.getParameter(RestConstants.LEN_PARAM);
287                int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
288                start = (start < 1) ? 1 : start;
289                int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
290                len = (len < 1) ? 50 : len;
291                DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request),
292                        getAuthToken(request));
293                WorkflowsInfo jobs = dagEngine.getJobs(filter, start, len);
294                List<WorkflowJobBean> jsonWorkflows = jobs.getWorkflows();
295                json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jsonWorkflows));
296                json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal());
297                json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart());
298                json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen());
299    
300            }
301            catch (DagEngineException ex) {
302                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
303            }
304    
305            return json;
306        }
307    
308        /**
309         * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the
310         * request object
311         */
312        @SuppressWarnings("unchecked")
313        private JSONObject getCoordinatorJobs(HttpServletRequest request) throws XServletException {
314            JSONObject json = new JSONObject();
315            try {
316                String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
317                String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
318                String lenStr = request.getParameter(RestConstants.LEN_PARAM);
319                int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
320                start = (start < 1) ? 1 : start;
321                int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
322                len = (len < 1) ? 50 : len;
323                CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
324                        getUser(request), getAuthToken(request));
325                CoordinatorJobInfo jobs = coordEngine.getCoordJobs(filter, start, len);
326                List<CoordinatorJobBean> jsonJobs = jobs.getCoordJobs();
327                json.put(JsonTags.COORDINATOR_JOBS, CoordinatorJobBean.toJSONArray(jsonJobs));
328                json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal());
329                json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart());
330                json.put(JsonTags.COORD_JOB_LEN, jobs.getLen());
331    
332            }
333            catch (CoordinatorEngineException ex) {
334                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
335            }
336            return json;
337        }
338    
339        @SuppressWarnings("unchecked")
340        private JSONObject getBundleJobs(HttpServletRequest request) throws XServletException {
341            JSONObject json = new JSONObject();
342            try {
343                String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
344                String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
345                String lenStr = request.getParameter(RestConstants.LEN_PARAM);
346                int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
347                start = (start < 1) ? 1 : start;
348                int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
349                len = (len < 1) ? 50 : len;
350    
351                BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
352                        getAuthToken(request));
353                BundleJobInfo jobs = bundleEngine.getBundleJobs(filter, start, len);
354                List<BundleJobBean> jsonJobs = jobs.getBundleJobs();
355    
356                json.put(JsonTags.BUNDLE_JOBS, BundleJobBean.toJSONArray(jsonJobs));
357                json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal());
358                json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart());
359                json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen());
360    
361            }
362            catch (BundleEngineException ex) {
363                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
364            }
365            return json;
366        }
367    
368        /**
369         * service implementation to submit a http job
370         */
371        private JSONObject submitHttpJob(HttpServletRequest request, Configuration conf, String jobType)
372                throws XServletException {
373            JSONObject json = new JSONObject();
374    
375            try {
376                String user = conf.get(OozieClient.USER_NAME);
377                DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, getAuthToken(request));
378                String id = dagEngine.submitHttpJob(conf, jobType);
379                json.put(JsonTags.JOB_ID, id);
380            }
381            catch (DagEngineException ex) {
382                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
383            }
384    
385            return json;
386        }
387    }