001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.servlet;
019    
020    import java.io.IOException;
021    import java.util.Arrays;
022    
023    import javax.servlet.ServletException;
024    import javax.servlet.http.HttpServletRequest;
025    import javax.servlet.http.HttpServletResponse;
026    
027    import org.apache.hadoop.conf.Configuration;
028    import org.apache.oozie.BaseEngineException;
029    import org.apache.oozie.ErrorCode;
030    import org.apache.oozie.client.OozieClient;
031    import org.apache.oozie.client.XOozieClient;
032    import org.apache.oozie.client.rest.JsonBean;
033    import org.apache.oozie.client.rest.RestConstants;
034    import org.apache.oozie.service.AuthorizationException;
035    import org.apache.oozie.service.AuthorizationService;
036    import org.apache.oozie.service.Services;
037    import org.apache.oozie.service.XLogService;
038    import org.apache.oozie.util.ConfigUtils;
039    import org.apache.oozie.util.JobUtils;
040    import org.apache.oozie.util.XConfiguration;
041    import org.apache.oozie.util.XLog;
042    import org.json.simple.JSONObject;
043    
044    public abstract class BaseJobServlet extends JsonRestServlet {
045    
046        private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1];
047    
048        static {
049            RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo(
050                    RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo(
051                    RestConstants.JOB_SHOW_PARAM, String.class, false, Arrays.asList("GET"))));
052        }
053    
054        public BaseJobServlet(String instrumentationName) {
055            super(instrumentationName, RESOURCES_INFO);
056        }
057    
058        /**
059         * Perform various job related actions - start, suspend, resume, kill, etc.
060         */
061        @Override
062        protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
063            String jobId = getResourceName(request);
064            request.setAttribute(AUDIT_PARAM, jobId);
065            request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM));
066            try {
067                AuthorizationService auth = Services.get().get(AuthorizationService.class);
068                auth.authorizeForJob(getUser(request), jobId, true);
069            }
070            catch (AuthorizationException ex) {
071                throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
072            }
073    
074            String action = request.getParameter(RestConstants.ACTION_PARAM);
075            if (action.equals(RestConstants.JOB_ACTION_START)) {
076                stopCron();
077                startJob(request, response);
078                startCron();
079                response.setStatus(HttpServletResponse.SC_OK);
080            }
081            else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
082                stopCron();
083                resumeJob(request, response);
084                startCron();
085                response.setStatus(HttpServletResponse.SC_OK);
086            }
087            else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
088                stopCron();
089                suspendJob(request, response);
090                startCron();
091                response.setStatus(HttpServletResponse.SC_OK);
092            }
093            else if (action.equals(RestConstants.JOB_ACTION_KILL)) {
094                stopCron();
095                killJob(request, response);
096                startCron();
097                response.setStatus(HttpServletResponse.SC_OK);
098            }
099            else if (action.equals(RestConstants.JOB_ACTION_CHANGE)) {
100                stopCron();
101                changeJob(request, response);
102                startCron();
103                response.setStatus(HttpServletResponse.SC_OK);
104            }
105            else if (action.equals(RestConstants.JOB_ACTION_RERUN)) {
106                validateContentType(request, RestConstants.XML_CONTENT_TYPE);
107                Configuration conf = new XConfiguration(request.getInputStream());
108                stopCron();
109                String requestUser = getUser(request);
110                if (!requestUser.equals(UNDEF)) {
111                    conf.set(OozieClient.USER_NAME, requestUser);
112                }
113                BaseJobServlet.checkAuthorizationForApp(conf);
114                JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
115                reRunJob(request, response, conf);
116                startCron();
117                response.setStatus(HttpServletResponse.SC_OK);
118            }
119            else if (action.equals(RestConstants.JOB_COORD_ACTION_RERUN)) {
120                validateContentType(request, RestConstants.XML_CONTENT_TYPE);
121                stopCron();
122                JSONObject json = reRunJob(request, response, null);
123                startCron();
124                if (json != null) {
125                    sendJsonResponse(response, HttpServletResponse.SC_OK, json);
126                }
127                else {
128                    response.setStatus(HttpServletResponse.SC_OK);
129                }
130            }
131            else if (action.equals(RestConstants.JOB_BUNDLE_ACTION_RERUN)) {
132                validateContentType(request, RestConstants.XML_CONTENT_TYPE);
133                stopCron();
134                JSONObject json = reRunJob(request, response, null);
135                startCron();
136                if (json != null) {
137                    sendJsonResponse(response, HttpServletResponse.SC_OK, json);
138                }
139                else {
140                    response.setStatus(HttpServletResponse.SC_OK);
141                }
142            }
143            else {
144                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
145                        RestConstants.ACTION_PARAM, action);
146            }
147        }
148    
149        /**
150         * Validate the configuration user/group. <p/>
151         *
152         * @param conf configuration.
153         * @throws XServletException thrown if the configuration does not have a property {@link
154         * org.apache.oozie.client.OozieClient#USER_NAME}.
155         */
156        static void checkAuthorizationForApp(Configuration conf) throws XServletException {
157            String user = conf.get(OozieClient.USER_NAME);
158            String acl = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.GROUP_NAME, OozieClient.JOB_ACL, null);
159            try {
160                if (user == null) {
161                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0401, OozieClient.USER_NAME);
162                }
163                AuthorizationService auth = Services.get().get(AuthorizationService.class);
164    
165                if (acl != null){
166                     conf.set(OozieClient.GROUP_NAME, acl);
167                }
168                else if (acl == null && auth.useDefaultGroupAsAcl()) {
169                    acl = auth.getDefaultGroup(user);
170                    conf.set(OozieClient.GROUP_NAME, acl);
171                }
172                XLog.Info.get().setParameter(XLogService.GROUP, acl);
173                String wfPath = conf.get(OozieClient.APP_PATH);
174                String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH);
175                String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
176    
177                if (wfPath == null && coordPath == null && bundlePath == null) {
178                    String[] libPaths = conf.getStrings(XOozieClient.LIBPATH);
179                    if (libPaths != null && libPaths.length > 0 && libPaths[0].trim().length() > 0) {
180                        conf.set(OozieClient.APP_PATH, libPaths[0].trim());
181                        wfPath = libPaths[0].trim();
182                    }
183                    else {
184                        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0405);
185                    }
186                }
187                ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath);
188    
189                if (wfPath != null) {
190                    auth.authorizeForApp(user, acl, wfPath, "workflow.xml", conf);
191                }
192                else if (coordPath != null){
193                    auth.authorizeForApp(user, acl, coordPath, "coordinator.xml", conf);
194                }
195                else if (bundlePath != null){
196                    auth.authorizeForApp(user, acl, bundlePath, "bundle.xml", conf);
197                }
198            }
199            catch (AuthorizationException ex) {
200                XLog.getLog(BaseJobServlet.class).info("AuthorizationException ", ex);
201                throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
202            }
203        }
204    
205        /**
206         * Return information about jobs.
207         */
208        @Override
209        public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
210            String jobId = getResourceName(request);
211            String show = request.getParameter(RestConstants.JOB_SHOW_PARAM);
212            String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
213                    ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
214    
215            try {
216                AuthorizationService auth = Services.get().get(AuthorizationService.class);
217                auth.authorizeForJob(getUser(request), jobId, false);
218            }
219            catch (AuthorizationException ex) {
220                throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
221            }
222    
223            if (show == null || show.equals(RestConstants.JOB_SHOW_INFO)) {
224                stopCron();
225                JsonBean job = null;
226                try {
227                    job = getJob(request, response);
228                }
229                catch (BaseEngineException e) {
230                    // TODO Auto-generated catch block
231                    // e.printStackTrace();
232    
233                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
234                }
235                startCron();
236                sendJsonResponse(response, HttpServletResponse.SC_OK, job, timeZoneId);
237            }
238            else if (show.equals(RestConstants.JOB_SHOW_LOG)) {
239                response.setContentType(TEXT_UTF8);
240                streamJobLog(request, response);
241            }
242            else if (show.equals(RestConstants.JOB_SHOW_DEFINITION)) {
243                stopCron();
244                response.setContentType(XML_UTF8);
245                String wfDefinition = getJobDefinition(request, response);
246                startCron();
247                response.setStatus(HttpServletResponse.SC_OK);
248                response.getWriter().write(wfDefinition);
249            }
250            else if (show.equals(RestConstants.JOB_SHOW_GRAPH)) {
251                stopCron();
252                streamJobGraph(request, response);
253                startCron(); // -- should happen before you stream anything in response?
254            }
255            else {
256                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
257                        RestConstants.JOB_SHOW_PARAM, show);
258            }
259        }
260    
261        /**
262         * abstract method to start a job, either workflow or coordinator
263         *
264         * @param request
265         * @param response
266         * @throws XServletException
267         * @throws IOException TODO
268         */
269        abstract void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
270                IOException;
271    
272        /**
273         * abstract method to resume a job, either workflow or coordinator
274         *
275         * @param request
276         * @param response
277         * @throws XServletException
278         * @throws IOException TODO
279         */
280        abstract void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
281                IOException;
282    
283        /**
284         * abstract method to suspend a job, either workflow or coordinator
285         *
286         * @param request
287         * @param response
288         * @throws XServletException
289         * @throws IOException TODO
290         */
291        abstract void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
292                IOException;
293    
294        /**
295         * abstract method to kill a job, either workflow or coordinator
296         *
297         * @param request
298         * @param response
299         * @throws XServletException
300         * @throws IOException TODO
301         */
302        abstract void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
303                IOException;
304    
305        /**
306         * abstract method to change a coordinator job
307         *
308         * @param request
309         * @param response
310         * @throws XServletException
311         * @throws IOException TODO
312         */
313        abstract void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
314                IOException;
315    
316        /**
317         * abstract method to re-run a job, either workflow or coordinator
318         *
319         * @param request
320         * @param response
321         * @param conf
322         * @throws XServletException
323         * @throws IOException TODO
324         */
325        abstract JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
326                throws XServletException, IOException;
327    
328        /**
329         * abstract method to get a job, either workflow or coordinator, in JsonBean representation
330         *
331         * @param request
332         * @param response
333         * @return JsonBean representation of a job, either workflow or coordinator
334         * @throws XServletException
335         * @throws IOException TODO
336         * @throws BaseEngineException
337         */
338        abstract JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
339                IOException, BaseEngineException;
340    
341        /**
342         * abstract method to get definition of a job, either workflow or coordinator
343         *
344         * @param request
345         * @param response
346         * @return job, either workflow or coordinator, definition in string format
347         * @throws XServletException
348         * @throws IOException TODO
349         */
350        abstract String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
351                throws XServletException, IOException;
352    
353        /**
354         * abstract method to get and stream log information of job, either workflow or coordinator
355         *
356         * @param request
357         * @param response
358         * @throws XServletException
359         * @throws IOException
360         */
361        abstract void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
362                IOException;
363    
364        /**
365         * abstract method to create and stream image for runtime DAG -- workflow only
366         *
367         * @param request
368         * @param response
369         * @throws XServletException
370         * @throws IOException
371         */
372        abstract void streamJobGraph(HttpServletRequest request, HttpServletResponse response)
373                throws XServletException, IOException;
374    }