001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.servlet;
019    
020    import java.io.IOException;
021    import java.util.Arrays;
022    
023    import javax.servlet.ServletException;
024    import javax.servlet.http.HttpServletRequest;
025    import javax.servlet.http.HttpServletResponse;
026    
027    import org.apache.hadoop.conf.Configuration;
028    import org.apache.hadoop.fs.FileStatus;
029    import org.apache.hadoop.fs.FileSystem;
030    import org.apache.hadoop.fs.Path;
031    import org.apache.oozie.BaseEngineException;
032    import org.apache.oozie.ErrorCode;
033    import org.apache.oozie.client.OozieClient;
034    import org.apache.oozie.client.XOozieClient;
035    import org.apache.oozie.client.rest.JsonBean;
036    import org.apache.oozie.client.rest.RestConstants;
037    import org.apache.oozie.service.AuthorizationException;
038    import org.apache.oozie.service.AuthorizationService;
039    import org.apache.oozie.service.HadoopAccessorException;
040    import org.apache.oozie.service.HadoopAccessorService;
041    import org.apache.oozie.service.Services;
042    import org.apache.oozie.service.XLogService;
043    import org.apache.oozie.util.JobUtils;
044    import org.apache.oozie.util.XConfiguration;
045    import org.apache.oozie.util.XLog;
046    import org.json.simple.JSONObject;
047    
048    public abstract class BaseJobServlet extends JsonRestServlet {
049    
050        private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1];
051    
052        static {
053            RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo(
054                    RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo(
055                    RestConstants.JOB_SHOW_PARAM, String.class, false, Arrays.asList("GET"))));
056        }
057    
058        public BaseJobServlet(String instrumentationName) {
059            super(instrumentationName, RESOURCES_INFO);
060        }
061    
062        /**
063         * Perform various job related actions - start, suspend, resume, kill, etc.
064         */
065        @Override
066        protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
067            String jobId = getResourceName(request);
068            request.setAttribute(AUDIT_PARAM, jobId);
069            request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM));
070            try {
071                AuthorizationService auth = Services.get().get(AuthorizationService.class);
072                auth.authorizeForJob(getUser(request), jobId, true);
073            }
074            catch (AuthorizationException ex) {
075                throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
076            }
077    
078            String action = request.getParameter(RestConstants.ACTION_PARAM);
079            if (action.equals(RestConstants.JOB_ACTION_START)) {
080                stopCron();
081                startJob(request, response);
082                startCron();
083                response.setStatus(HttpServletResponse.SC_OK);
084            }
085            else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
086                stopCron();
087                resumeJob(request, response);
088                startCron();
089                response.setStatus(HttpServletResponse.SC_OK);
090            }
091            else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
092                stopCron();
093                suspendJob(request, response);
094                startCron();
095                response.setStatus(HttpServletResponse.SC_OK);
096            }
097            else if (action.equals(RestConstants.JOB_ACTION_KILL)) {
098                stopCron();
099                killJob(request, response);
100                startCron();
101                response.setStatus(HttpServletResponse.SC_OK);
102            }
103            else if (action.equals(RestConstants.JOB_ACTION_CHANGE)) {
104                stopCron();
105                changeJob(request, response);
106                startCron();
107                response.setStatus(HttpServletResponse.SC_OK);
108            }
109            else if (action.equals(RestConstants.JOB_ACTION_RERUN)) {
110                validateContentType(request, RestConstants.XML_CONTENT_TYPE);
111                Configuration conf = new XConfiguration(request.getInputStream());
112                stopCron();
113                checkAuthorizationForApp(getUser(request), conf);
114                JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
115                reRunJob(request, response, conf);
116                startCron();
117                response.setStatus(HttpServletResponse.SC_OK);
118            }
119            else if (action.equals(RestConstants.JOB_COORD_ACTION_RERUN)) {
120                validateContentType(request, RestConstants.XML_CONTENT_TYPE);
121                stopCron();
122                JSONObject json = reRunJob(request, response, null);
123                startCron();
124                if (json != null) {
125                    sendJsonResponse(response, HttpServletResponse.SC_OK, json);
126                }
127                else {
128                    response.setStatus(HttpServletResponse.SC_OK);
129                }
130            }
131            else if (action.equals(RestConstants.JOB_BUNDLE_ACTION_RERUN)) {
132                validateContentType(request, RestConstants.XML_CONTENT_TYPE);
133                stopCron();
134                JSONObject json = reRunJob(request, response, null);
135                startCron();
136                if (json != null) {
137                    sendJsonResponse(response, HttpServletResponse.SC_OK, json);
138                }
139                else {
140                    response.setStatus(HttpServletResponse.SC_OK);
141                }
142            }
143            else {
144                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
145                        RestConstants.ACTION_PARAM, action);
146            }
147        }
148    
149        /**
150         * Validate the configuration user/group. <p/>
151         *
152         * @param requestUser user in request.
153         * @param conf configuration.
154         * @throws XServletException thrown if the configuration does not have a property {@link
155         * org.apache.oozie.client.OozieClient#USER_NAME}.
156         */
157        static void checkAuthorizationForApp(String requestUser, Configuration conf) throws XServletException {
158            String user = conf.get(OozieClient.USER_NAME);
159            String group = conf.get(OozieClient.GROUP_NAME);
160            try {
161                if (user == null) {
162                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0401, OozieClient.USER_NAME);
163                }
164                if (!requestUser.equals(UNDEF) && !user.equals(requestUser)) {
165                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0400, requestUser, user);
166                }
167                AuthorizationService auth = Services.get().get(AuthorizationService.class);
168                if (group == null) {
169                    group = auth.getDefaultGroup(user);
170                    conf.set(OozieClient.GROUP_NAME, group);
171                }
172                else {
173                    auth.authorizeForGroup(user, group);
174                }
175                XLog.Info.get().setParameter(XLogService.GROUP, group);
176                String wfPath = conf.get(OozieClient.APP_PATH);
177                String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH);
178                String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
179    
180                if (wfPath == null && coordPath == null && bundlePath == null) {
181                    String libPath = conf.get(XOozieClient.LIBPATH);
182                    conf.set(OozieClient.APP_PATH, libPath);
183                    wfPath = libPath;
184                }
185                ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath);
186    
187                if (wfPath != null) {
188                    auth.authorizeForApp(user, group, wfPath, "workflow.xml", conf);
189                }
190                else if (coordPath != null){
191                    auth.authorizeForApp(user, group, coordPath, "coordinator.xml", conf);
192                }
193                else if (bundlePath != null){
194                    auth.authorizeForApp(user, group, bundlePath, "bundle.xml", conf);
195                }
196            }
197            catch (AuthorizationException ex) {
198                XLog.getLog(BaseJobServlet.class).info("AuthorizationException ", ex);
199                throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
200            }
201        }
202    
203        /**
204         * Return information about jobs.
205         */
206        @Override
207        public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
208            String jobId = getResourceName(request);
209            String show = request.getParameter(RestConstants.JOB_SHOW_PARAM);
210    
211            try {
212                AuthorizationService auth = Services.get().get(AuthorizationService.class);
213                auth.authorizeForJob(getUser(request), jobId, false);
214            }
215            catch (AuthorizationException ex) {
216                throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
217            }
218    
219            if (show == null || show.equals(RestConstants.JOB_SHOW_INFO)) {
220                stopCron();
221                JsonBean job = null;
222                try {
223                    job = getJob(request, response);
224                }
225                catch (BaseEngineException e) {
226                    // TODO Auto-generated catch block
227                    // e.printStackTrace();
228    
229                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
230                }
231                startCron();
232                sendJsonResponse(response, HttpServletResponse.SC_OK, job);
233            }
234            else if (show.equals(RestConstants.JOB_SHOW_LOG)) {
235                response.setContentType(TEXT_UTF8);
236                streamJobLog(request, response);
237            }
238            else if (show.equals(RestConstants.JOB_SHOW_DEFINITION)) {
239                stopCron();
240                response.setContentType(XML_UTF8);
241                String wfDefinition = getJobDefinition(request, response);
242                startCron();
243                response.setStatus(HttpServletResponse.SC_OK);
244                response.getWriter().write(wfDefinition);
245            }
246            else {
247                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
248                        RestConstants.JOB_SHOW_PARAM, show);
249            }
250        }
251    
252        /**
253         * abstract method to start a job, either workflow or coordinator
254         *
255         * @param request
256         * @param response
257         * @throws XServletException
258         * @throws IOException TODO
259         */
260        abstract void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
261                IOException;
262    
263        /**
264         * abstract method to resume a job, either workflow or coordinator
265         *
266         * @param request
267         * @param response
268         * @throws XServletException
269         * @throws IOException TODO
270         */
271        abstract void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
272                IOException;
273    
274        /**
275         * abstract method to suspend a job, either workflow or coordinator
276         *
277         * @param request
278         * @param response
279         * @throws XServletException
280         * @throws IOException TODO
281         */
282        abstract void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
283                IOException;
284    
285        /**
286         * abstract method to kill a job, either workflow or coordinator
287         *
288         * @param request
289         * @param response
290         * @throws XServletException
291         * @throws IOException TODO
292         */
293        abstract void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
294                IOException;
295    
296        /**
297         * abstract method to change a coordinator job
298         *
299         * @param request
300         * @param response
301         * @throws XServletException
302         * @throws IOException TODO
303         */
304        abstract void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
305                IOException;
306    
307        /**
308         * abstract method to re-run a job, either workflow or coordinator
309         *
310         * @param request
311         * @param response
312         * @param conf
313         * @throws XServletException
314         * @throws IOException TODO
315         */
316        abstract JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
317                throws XServletException, IOException;
318    
319        /**
320         * abstract method to get a job, either workflow or coordinator, in JsonBean representation
321         *
322         * @param request
323         * @param response
324         * @return JsonBean representation of a job, either workflow or coordinator
325         * @throws XServletException
326         * @throws IOException TODO
327         * @throws BaseEngineException
328         */
329        abstract JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
330                IOException, BaseEngineException;
331    
332        /**
333         * abstract method to get definition of a job, either workflow or coordinator
334         *
335         * @param request
336         * @param response
337         * @return job, either workflow or coordinator, definition in string format
338         * @throws XServletException
339         * @throws IOException TODO
340         */
341        abstract String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
342                throws XServletException, IOException;
343    
344        /**
345         * abstract method to get and stream log information of job, either workflow or coordinator
346         *
347         * @param request
348         * @param response
349         * @throws XServletException
350         * @throws IOException
351         */
352        abstract void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
353                IOException;
354    
355    }