001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.servlet;
019    
020    import java.io.IOException;
021    import java.util.Arrays;
022    
023    import javax.servlet.ServletConfig;
024    import javax.servlet.ServletException;
025    import javax.servlet.http.HttpServletRequest;
026    import javax.servlet.http.HttpServletResponse;
027    
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.hadoop.fs.FileStatus;
030    import org.apache.hadoop.fs.FileSystem;
031    import org.apache.hadoop.fs.Path;
032    import org.apache.oozie.BaseEngineException;
033    import org.apache.oozie.ErrorCode;
034    import org.apache.oozie.client.OozieClient;
035    import org.apache.oozie.client.XOozieClient;
036    import org.apache.oozie.client.rest.JsonBean;
037    import org.apache.oozie.client.rest.RestConstants;
038    import org.apache.oozie.service.AuthorizationException;
039    import org.apache.oozie.service.AuthorizationService;
040    import org.apache.oozie.service.HadoopAccessorException;
041    import org.apache.oozie.service.HadoopAccessorService;
042    import org.apache.oozie.service.Services;
043    import org.apache.oozie.service.XLogService;
044    import org.apache.oozie.util.ConfigUtils;
045    import org.apache.oozie.util.JobUtils;
046    import org.apache.oozie.util.XConfiguration;
047    import org.apache.oozie.util.XLog;
048    import org.json.simple.JSONObject;
049    
050    public abstract class BaseJobServlet extends JsonRestServlet {
051    
052        private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1];
053    
054        static {
055            RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo(
056                    RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo(
057                    RestConstants.JOB_SHOW_PARAM, String.class, false, Arrays.asList("GET"))));
058        }
059    
060        public BaseJobServlet(String instrumentationName) {
061            super(instrumentationName, RESOURCES_INFO);
062        }
063    
064        /**
065         * Perform various job related actions - start, suspend, resume, kill, etc.
066         */
067        @Override
068        protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
069            String jobId = getResourceName(request);
070            request.setAttribute(AUDIT_PARAM, jobId);
071            request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM));
072            try {
073                AuthorizationService auth = Services.get().get(AuthorizationService.class);
074                auth.authorizeForJob(getUser(request), jobId, true);
075            }
076            catch (AuthorizationException ex) {
077                throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
078            }
079    
080            String action = request.getParameter(RestConstants.ACTION_PARAM);
081            if (action.equals(RestConstants.JOB_ACTION_START)) {
082                stopCron();
083                startJob(request, response);
084                startCron();
085                response.setStatus(HttpServletResponse.SC_OK);
086            }
087            else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
088                stopCron();
089                resumeJob(request, response);
090                startCron();
091                response.setStatus(HttpServletResponse.SC_OK);
092            }
093            else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
094                stopCron();
095                suspendJob(request, response);
096                startCron();
097                response.setStatus(HttpServletResponse.SC_OK);
098            }
099            else if (action.equals(RestConstants.JOB_ACTION_KILL)) {
100                stopCron();
101                killJob(request, response);
102                startCron();
103                response.setStatus(HttpServletResponse.SC_OK);
104            }
105            else if (action.equals(RestConstants.JOB_ACTION_CHANGE)) {
106                stopCron();
107                changeJob(request, response);
108                startCron();
109                response.setStatus(HttpServletResponse.SC_OK);
110            }
111            else if (action.equals(RestConstants.JOB_ACTION_RERUN)) {
112                validateContentType(request, RestConstants.XML_CONTENT_TYPE);
113                Configuration conf = new XConfiguration(request.getInputStream());
114                stopCron();
115                checkAuthorizationForApp(getUser(request), conf);
116                JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
117                reRunJob(request, response, conf);
118                startCron();
119                response.setStatus(HttpServletResponse.SC_OK);
120            }
121            else if (action.equals(RestConstants.JOB_COORD_ACTION_RERUN)) {
122                validateContentType(request, RestConstants.XML_CONTENT_TYPE);
123                stopCron();
124                JSONObject json = reRunJob(request, response, null);
125                startCron();
126                if (json != null) {
127                    sendJsonResponse(response, HttpServletResponse.SC_OK, json);
128                }
129                else {
130                    response.setStatus(HttpServletResponse.SC_OK);
131                }
132            }
133            else if (action.equals(RestConstants.JOB_BUNDLE_ACTION_RERUN)) {
134                validateContentType(request, RestConstants.XML_CONTENT_TYPE);
135                stopCron();
136                JSONObject json = reRunJob(request, response, null);
137                startCron();
138                if (json != null) {
139                    sendJsonResponse(response, HttpServletResponse.SC_OK, json);
140                }
141                else {
142                    response.setStatus(HttpServletResponse.SC_OK);
143                }
144            }
145            else {
146                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
147                        RestConstants.ACTION_PARAM, action);
148            }
149        }
150    
151        /**
152         * Validate the configuration user/group. <p/>
153         *
154         * @param requestUser user in request.
155         * @param conf configuration.
156         * @throws XServletException thrown if the configuration does not have a property {@link
157         * org.apache.oozie.client.OozieClient#USER_NAME}.
158         */
159        static void checkAuthorizationForApp(String requestUser, Configuration conf) throws XServletException {
160            String user = conf.get(OozieClient.USER_NAME);
161            String acl = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.GROUP_NAME, OozieClient.JOB_ACL, null);
162            try {
163                if (user == null) {
164                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0401, OozieClient.USER_NAME);
165                }
166                if (!requestUser.equals(UNDEF) && !user.equals(requestUser)) {
167                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0400, requestUser, user);
168                }
169                AuthorizationService auth = Services.get().get(AuthorizationService.class);
170    
171                if (acl == null && auth.useDefaultGroupAsAcl()) {
172                    acl = auth.getDefaultGroup(user);
173                    conf.set(OozieClient.GROUP_NAME, acl);
174                }
175                XLog.Info.get().setParameter(XLogService.GROUP, acl);
176                String wfPath = conf.get(OozieClient.APP_PATH);
177                String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH);
178                String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
179    
180                if (wfPath == null && coordPath == null && bundlePath == null) {
181                    String libPath = conf.get(XOozieClient.LIBPATH);
182                    conf.set(OozieClient.APP_PATH, libPath);
183                    wfPath = libPath;
184                }
185                ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath);
186    
187                if (wfPath != null) {
188                    auth.authorizeForApp(user, acl, wfPath, "workflow.xml", conf);
189                }
190                else if (coordPath != null){
191                    auth.authorizeForApp(user, acl, coordPath, "coordinator.xml", conf);
192                }
193                else if (bundlePath != null){
194                    auth.authorizeForApp(user, acl, bundlePath, "bundle.xml", conf);
195                }
196            }
197            catch (AuthorizationException ex) {
198                XLog.getLog(BaseJobServlet.class).info("AuthorizationException ", ex);
199                throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
200            }
201        }
202    
203        /**
204         * Return information about jobs.
205         */
206        @Override
207        public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
208            String jobId = getResourceName(request);
209            String show = request.getParameter(RestConstants.JOB_SHOW_PARAM);
210    
211            try {
212                AuthorizationService auth = Services.get().get(AuthorizationService.class);
213                auth.authorizeForJob(getUser(request), jobId, false);
214            }
215            catch (AuthorizationException ex) {
216                throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
217            }
218    
219            if (show == null || show.equals(RestConstants.JOB_SHOW_INFO)) {
220                stopCron();
221                JsonBean job = null;
222                try {
223                    job = getJob(request, response);
224                }
225                catch (BaseEngineException e) {
226                    // TODO Auto-generated catch block
227                    // e.printStackTrace();
228    
229                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
230                }
231                startCron();
232                sendJsonResponse(response, HttpServletResponse.SC_OK, job);
233            }
234            else if (show.equals(RestConstants.JOB_SHOW_LOG)) {
235                response.setContentType(TEXT_UTF8);
236                streamJobLog(request, response);
237            }
238            else if (show.equals(RestConstants.JOB_SHOW_DEFINITION)) {
239                stopCron();
240                response.setContentType(XML_UTF8);
241                String wfDefinition = getJobDefinition(request, response);
242                startCron();
243                response.setStatus(HttpServletResponse.SC_OK);
244                response.getWriter().write(wfDefinition);
245            }
246            else {
247                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
248                        RestConstants.JOB_SHOW_PARAM, show);
249            }
250        }
251    
252        /**
253         * abstract method to start a job, either workflow or coordinator
254         *
255         * @param request
256         * @param response
257         * @throws XServletException
258         * @throws IOException TODO
259         */
260        abstract void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
261                IOException;
262    
263        /**
264         * abstract method to resume a job, either workflow or coordinator
265         *
266         * @param request
267         * @param response
268         * @throws XServletException
269         * @throws IOException TODO
270         */
271        abstract void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
272                IOException;
273    
274        /**
275         * abstract method to suspend a job, either workflow or coordinator
276         *
277         * @param request
278         * @param response
279         * @throws XServletException
280         * @throws IOException TODO
281         */
282        abstract void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
283                IOException;
284    
285        /**
286         * abstract method to kill a job, either workflow or coordinator
287         *
288         * @param request
289         * @param response
290         * @throws XServletException
291         * @throws IOException TODO
292         */
293        abstract void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
294                IOException;
295    
296        /**
297         * abstract method to change a coordinator job
298         *
299         * @param request
300         * @param response
301         * @throws XServletException
302         * @throws IOException TODO
303         */
304        abstract void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
305                IOException;
306    
307        /**
308         * abstract method to re-run a job, either workflow or coordinator
309         *
310         * @param request
311         * @param response
312         * @param conf
313         * @throws XServletException
314         * @throws IOException TODO
315         */
316        abstract JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
317                throws XServletException, IOException;
318    
319        /**
320         * abstract method to get a job, either workflow or coordinator, in JsonBean representation
321         *
322         * @param request
323         * @param response
324         * @return JsonBean representation of a job, either workflow or coordinator
325         * @throws XServletException
326         * @throws IOException TODO
327         * @throws BaseEngineException
328         */
329        abstract JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
330                IOException, BaseEngineException;
331    
332        /**
333         * abstract method to get definition of a job, either workflow or coordinator
334         *
335         * @param request
336         * @param response
337         * @return job, either workflow or coordinator, definition in string format
338         * @throws XServletException
339         * @throws IOException TODO
340         */
341        abstract String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
342                throws XServletException, IOException;
343    
344        /**
345         * abstract method to get and stream log information of job, either workflow or coordinator
346         *
347         * @param request
348         * @param response
349         * @throws XServletException
350         * @throws IOException
351         */
352        abstract void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
353                IOException;
354    
355    }