001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.servlet;
019    
020    import java.io.IOException;
021    import java.util.Arrays;
022    
023    import javax.servlet.ServletException;
024    import javax.servlet.http.HttpServletRequest;
025    import javax.servlet.http.HttpServletResponse;
026    
027    import org.apache.hadoop.conf.Configuration;
028    import org.apache.oozie.BaseEngineException;
029    import org.apache.oozie.ErrorCode;
030    import org.apache.oozie.client.OozieClient;
031    import org.apache.oozie.client.XOozieClient;
032    import org.apache.oozie.client.rest.JsonBean;
033    import org.apache.oozie.client.rest.JsonTags;
034    import org.apache.oozie.client.rest.RestConstants;
035    import org.apache.oozie.service.AuthorizationException;
036    import org.apache.oozie.service.AuthorizationService;
037    import org.apache.oozie.service.Services;
038    import org.apache.oozie.service.XLogService;
039    import org.apache.oozie.util.ConfigUtils;
040    import org.apache.oozie.util.JobUtils;
041    import org.apache.oozie.util.XConfiguration;
042    import org.apache.oozie.util.XLog;
043    import org.json.simple.JSONObject;
044    
045    public abstract class BaseJobServlet extends JsonRestServlet {
046    
047        private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1];
048    
049        static {
050            RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo(
051                    RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo(
052                    RestConstants.JOB_SHOW_PARAM, String.class, false, Arrays.asList("GET")), new ParameterInfo(
053                            RestConstants.ORDER_PARAM, String.class, false, Arrays.asList("GET"))));
054        }
055    
056        public BaseJobServlet(String instrumentationName) {
057            super(instrumentationName, RESOURCES_INFO);
058        }
059    
060        /**
061         * Perform various job related actions - start, suspend, resume, kill, etc.
062         */
063        @Override
064        protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
065            String jobId = getResourceName(request);
066            request.setAttribute(AUDIT_PARAM, jobId);
067            request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM));
068            try {
069                AuthorizationService auth = Services.get().get(AuthorizationService.class);
070                auth.authorizeForJob(getUser(request), jobId, true);
071            }
072            catch (AuthorizationException ex) {
073                throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
074            }
075    
076            String action = request.getParameter(RestConstants.ACTION_PARAM);
077            if (action.equals(RestConstants.JOB_ACTION_START)) {
078                stopCron();
079                startJob(request, response);
080                startCron();
081                response.setStatus(HttpServletResponse.SC_OK);
082            }
083            else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
084                stopCron();
085                resumeJob(request, response);
086                startCron();
087                response.setStatus(HttpServletResponse.SC_OK);
088            }
089            else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
090                stopCron();
091                suspendJob(request, response);
092                startCron();
093                response.setStatus(HttpServletResponse.SC_OK);
094            }
095            else if (action.equals(RestConstants.JOB_ACTION_KILL)) {
096                stopCron();
097                killJob(request, response);
098                startCron();
099                response.setStatus(HttpServletResponse.SC_OK);
100            }
101            else if (action.equals(RestConstants.JOB_ACTION_CHANGE)) {
102                stopCron();
103                changeJob(request, response);
104                startCron();
105                response.setStatus(HttpServletResponse.SC_OK);
106            }
107            else if (action.equals(RestConstants.JOB_ACTION_RERUN)) {
108                validateContentType(request, RestConstants.XML_CONTENT_TYPE);
109                Configuration conf = new XConfiguration(request.getInputStream());
110                stopCron();
111                String requestUser = getUser(request);
112                if (!requestUser.equals(UNDEF)) {
113                    conf.set(OozieClient.USER_NAME, requestUser);
114                }
115                BaseJobServlet.checkAuthorizationForApp(conf);
116                JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
117                reRunJob(request, response, conf);
118                startCron();
119                response.setStatus(HttpServletResponse.SC_OK);
120            }
121            else if (action.equals(RestConstants.JOB_COORD_ACTION_RERUN)) {
122                validateContentType(request, RestConstants.XML_CONTENT_TYPE);
123                stopCron();
124                JSONObject json = reRunJob(request, response, null);
125                startCron();
126                if (json != null) {
127                    sendJsonResponse(response, HttpServletResponse.SC_OK, json);
128                }
129                else {
130                    response.setStatus(HttpServletResponse.SC_OK);
131                }
132            }
133            else if (action.equals(RestConstants.JOB_BUNDLE_ACTION_RERUN)) {
134                validateContentType(request, RestConstants.XML_CONTENT_TYPE);
135                stopCron();
136                JSONObject json = reRunJob(request, response, null);
137                startCron();
138                if (json != null) {
139                    sendJsonResponse(response, HttpServletResponse.SC_OK, json);
140                }
141                else {
142                    response.setStatus(HttpServletResponse.SC_OK);
143                }
144            }
145            else {
146                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
147                        RestConstants.ACTION_PARAM, action);
148            }
149        }
150    
151        /**
152         * Validate the configuration user/group. <p/>
153         *
154         * @param conf configuration.
155         * @throws XServletException thrown if the configuration does not have a property {@link
156         * org.apache.oozie.client.OozieClient#USER_NAME}.
157         */
158        static void checkAuthorizationForApp(Configuration conf) throws XServletException {
159            String user = conf.get(OozieClient.USER_NAME);
160            String acl = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.GROUP_NAME, OozieClient.JOB_ACL, null);
161            try {
162                if (user == null) {
163                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0401, OozieClient.USER_NAME);
164                }
165                AuthorizationService auth = Services.get().get(AuthorizationService.class);
166    
167                if (acl != null){
168                     conf.set(OozieClient.GROUP_NAME, acl);
169                }
170                else if (acl == null && auth.useDefaultGroupAsAcl()) {
171                    acl = auth.getDefaultGroup(user);
172                    conf.set(OozieClient.GROUP_NAME, acl);
173                }
174                XLog.Info.get().setParameter(XLogService.GROUP, acl);
175                String wfPath = conf.get(OozieClient.APP_PATH);
176                String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH);
177                String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
178    
179                if (wfPath == null && coordPath == null && bundlePath == null) {
180                    String[] libPaths = conf.getStrings(XOozieClient.LIBPATH);
181                    if (libPaths != null && libPaths.length > 0 && libPaths[0].trim().length() > 0) {
182                        conf.set(OozieClient.APP_PATH, libPaths[0].trim());
183                        wfPath = libPaths[0].trim();
184                    }
185                    else {
186                        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0405);
187                    }
188                }
189                ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath);
190    
191                if (wfPath != null) {
192                    auth.authorizeForApp(user, acl, wfPath, "workflow.xml", conf);
193                }
194                else if (coordPath != null){
195                    auth.authorizeForApp(user, acl, coordPath, "coordinator.xml", conf);
196                }
197                else if (bundlePath != null){
198                    auth.authorizeForApp(user, acl, bundlePath, "bundle.xml", conf);
199                }
200            }
201            catch (AuthorizationException ex) {
202                XLog.getLog(BaseJobServlet.class).info("AuthorizationException ", ex);
203                throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
204            }
205        }
206    
207        /**
208         * Return information about jobs.
209         */
210        @Override
211        public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
212            String jobId = getResourceName(request);
213            String show = request.getParameter(RestConstants.JOB_SHOW_PARAM);
214            String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
215                    ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
216    
217            try {
218                AuthorizationService auth = Services.get().get(AuthorizationService.class);
219                auth.authorizeForJob(getUser(request), jobId, false);
220            }
221            catch (AuthorizationException ex) {
222                throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
223            }
224    
225            if (show == null || show.equals(RestConstants.JOB_SHOW_INFO)) {
226                stopCron();
227                JsonBean job = null;
228                try {
229                    job = getJob(request, response);
230                }
231                catch (BaseEngineException e) {
232                    // TODO Auto-generated catch block
233                    // e.printStackTrace();
234    
235                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
236                }
237                startCron();
238                sendJsonResponse(response, HttpServletResponse.SC_OK, job, timeZoneId);
239            }
240    
241            else if (show.equals(RestConstants.JOB_SHOW_JMS_TOPIC)) {
242                stopCron();
243                String jmsTopicName = getJMSTopicName(request, response);
244                JSONObject json = new JSONObject();
245                json.put(JsonTags.JMS_TOPIC_NAME, jmsTopicName);
246                startCron();
247                sendJsonResponse(response, HttpServletResponse.SC_OK, json);
248            }
249    
250            else if (show.equals(RestConstants.JOB_SHOW_LOG)) {
251                response.setContentType(TEXT_UTF8);
252                streamJobLog(request, response);
253            }
254            else if (show.equals(RestConstants.JOB_SHOW_DEFINITION)) {
255                stopCron();
256                response.setContentType(XML_UTF8);
257                String wfDefinition = getJobDefinition(request, response);
258                startCron();
259                response.setStatus(HttpServletResponse.SC_OK);
260                response.getWriter().write(wfDefinition);
261            }
262            else if (show.equals(RestConstants.JOB_SHOW_GRAPH)) {
263                stopCron();
264                streamJobGraph(request, response);
265                startCron(); // -- should happen before you stream anything in response?
266            }
267            else {
268                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
269                        RestConstants.JOB_SHOW_PARAM, show);
270            }
271        }
272    
273        /**
274         * abstract method to start a job, either workflow or coordinator
275         *
276         * @param request
277         * @param response
278         * @throws XServletException
279         * @throws IOException TODO
280         */
281        abstract void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
282                IOException;
283    
284        /**
285         * abstract method to resume a job, either workflow or coordinator
286         *
287         * @param request
288         * @param response
289         * @throws XServletException
290         * @throws IOException TODO
291         */
292        abstract void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
293                IOException;
294    
295        /**
296         * abstract method to suspend a job, either workflow or coordinator
297         *
298         * @param request
299         * @param response
300         * @throws XServletException
301         * @throws IOException TODO
302         */
303        abstract void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
304                IOException;
305    
306        /**
307         * abstract method to kill a job, either workflow or coordinator
308         *
309         * @param request
310         * @param response
311         * @throws XServletException
312         * @throws IOException TODO
313         */
314        abstract void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
315                IOException;
316    
317        /**
318         * abstract method to change a coordinator job
319         *
320         * @param request
321         * @param response
322         * @throws XServletException
323         * @throws IOException TODO
324         */
325        abstract void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
326                IOException;
327    
328        /**
329         * abstract method to re-run a job, either workflow or coordinator
330         *
331         * @param request
332         * @param response
333         * @param conf
334         * @throws XServletException
335         * @throws IOException TODO
336         */
337        abstract JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
338                throws XServletException, IOException;
339    
340        /**
341         * abstract method to get a job, either workflow or coordinator, in JsonBean representation
342         *
343         * @param request
344         * @param response
345         * @return JsonBean representation of a job, either workflow or coordinator
346         * @throws XServletException
347         * @throws IOException TODO
348         * @throws BaseEngineException
349         */
350        abstract JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
351                IOException, BaseEngineException;
352    
353        /**
354         * abstract method to get definition of a job, either workflow or coordinator
355         *
356         * @param request
357         * @param response
358         * @return job, either workflow or coordinator, definition in string format
359         * @throws XServletException
360         * @throws IOException TODO
361         */
362        abstract String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
363                throws XServletException, IOException;
364    
365        /**
366         * abstract method to get and stream log information of job, either workflow or coordinator
367         *
368         * @param request
369         * @param response
370         * @throws XServletException
371         * @throws IOException
372         */
373        abstract void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
374                IOException;
375    
376        /**
377         * abstract method to create and stream image for runtime DAG -- workflow only
378         *
379         * @param request
380         * @param response
381         * @throws XServletException
382         * @throws IOException
383         */
384        abstract void streamJobGraph(HttpServletRequest request, HttpServletResponse response)
385                throws XServletException, IOException;
386    
387        /**
388         * abstract method to get JMS topic name for a job
389         * @param request
390         * @param response
391         * @throws XServletException
392         * @throws IOException
393         */
394        abstract String getJMSTopicName(HttpServletRequest request, HttpServletResponse response)
395                throws XServletException, IOException;
396    }