001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.servlet;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Map;
025
026import javax.servlet.http.HttpServletRequest;
027import javax.servlet.http.HttpServletResponse;
028
029import org.apache.commons.lang.StringUtils;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.oozie.BaseEngine;
032import org.apache.oozie.BaseEngineException;
033import org.apache.oozie.BundleEngine;
034import org.apache.oozie.CoordinatorActionBean;
035import org.apache.oozie.CoordinatorActionInfo;
036import org.apache.oozie.CoordinatorEngine;
037import org.apache.oozie.CoordinatorEngineException;
038import org.apache.oozie.CoordinatorWfActionBean;
039import org.apache.oozie.WorkflowActionBean;
040import org.apache.oozie.DagEngine;
041import org.apache.oozie.DagEngineException;
042import org.apache.oozie.ErrorCode;
043import org.apache.oozie.client.CoordinatorAction;
044import org.apache.oozie.client.rest.JsonBean;
045import org.apache.oozie.client.rest.JsonTags;
046import org.apache.oozie.client.rest.RestConstants;
047import org.apache.oozie.command.CommandException;
048import org.apache.oozie.command.coord.CoordCommandUtils;
049import org.apache.oozie.command.wf.ActionXCommand;
050import org.apache.oozie.dependency.ActionDependency;
051import org.apache.oozie.service.BundleEngineService;
052import org.apache.oozie.service.CoordinatorEngineService;
053import org.apache.oozie.service.DagEngineService;
054import org.apache.oozie.service.Services;
055import org.apache.oozie.service.ConfigurationService;
056import org.apache.oozie.util.Pair;
057import org.json.simple.JSONArray;
058import org.json.simple.JSONObject;
059
060@SuppressWarnings("serial")
061public class V2JobServlet extends V1JobServlet {
062
063    private static final String INSTRUMENTATION_NAME = "v2job";
064
065    public V2JobServlet() {
066        super(INSTRUMENTATION_NAME);
067    }
068
069    @Override
070    protected JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
071        JsonBean jobBean = super.getWorkflowJobBean(request, response);
072        return jobBean;
073    }
074
075    @Override
076    protected JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response) throws XServletException {
077        JsonBean actionBean = super.getWorkflowActionBean(request, response);
078        return actionBean;
079    }
080
081    @Override
082    protected int getCoordinatorJobLength(int defaultLen, int len) {
083        return (len < 0) ? defaultLen : len;
084    }
085
086    @Override
087    protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException,
088            IOException {
089        String topicName;
090        String jobId = getResourceName(request);
091        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
092        try {
093            topicName = dagEngine.getJMSTopicName(jobId);
094        }
095        catch (DagEngineException ex) {
096            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
097        }
098        return topicName;
099    }
100
101    @Override
102    protected JSONObject getJobsByParentId(HttpServletRequest request, HttpServletResponse response)
103            throws XServletException, IOException {
104        return super.getJobsByParentId(request, response);
105    }
106
107    /**
108     * Update coord job.
109     *
110     * @param request the request
111     * @param response the response
112     * @return the JSON object
113     * @throws XServletException the x servlet exception
114     * @throws IOException Signals that an I/O exception has occurred.
115     */
116    @SuppressWarnings("unchecked")
117    @Override
118    protected JSONObject updateJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
119            throws XServletException, IOException {
120        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class)
121                .getCoordinatorEngine(getUser(request));
122        JSONObject json = new JSONObject();
123        try {
124            String jobId = getResourceName(request);
125            boolean dryrun = StringUtils.isEmpty(request.getParameter(RestConstants.JOB_ACTION_DRYRUN)) ? false
126                    : Boolean.parseBoolean(request.getParameter(RestConstants.JOB_ACTION_DRYRUN));
127            boolean showDiff = StringUtils.isEmpty(request.getParameter(RestConstants.JOB_ACTION_SHOWDIFF)) ? true
128                    : Boolean.parseBoolean(request.getParameter(RestConstants.JOB_ACTION_SHOWDIFF));
129
130            String diff = coordEngine.updateJob(conf, jobId, dryrun, showDiff);
131            JSONObject diffJson = new JSONObject();
132            diffJson.put(JsonTags.COORD_UPDATE_DIFF, diff);
133            json.put(JsonTags.COORD_UPDATE, diffJson);
134        }
135        catch (CoordinatorEngineException e) {
136            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
137        }
138        return json;
139    }
140
141    /**
142     * Ignore a coordinator job
143     * @param request request object
144     * @param response response object
145     * @throws XServletException
146     * @throws IOException
147     */
148    @Override
149    protected JSONObject ignoreJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
150        String jobId = getResourceName(request);
151        if (jobId.endsWith("-W")) {
152            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Workflow Ignore Not supported");
153        } else if (jobId.endsWith("-B")) {
154            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Bundle Ignore Not supported");
155        } else {
156            return ignoreCoordinatorJob(request, response);
157        }
158
159    }
160
161    @Override
162    protected void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException,
163            IOException {
164        String jobId = getResourceName(request);
165        String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST);
166        String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE);
167        String childIds = request.getParameter(RestConstants.COORDINATORS_PARAM);
168        try {
169            getBaseEngine(jobId, getUser(request)).enableSLAAlert(jobId, actions, dates, childIds);
170        }
171        catch (BaseEngineException e) {
172            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
173        }
174
175    }
176
177    @Override
178    protected void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException,
179            IOException {
180        String jobId = getResourceName(request);
181        String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST);
182        String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE);
183        String childIds = request.getParameter(RestConstants.COORDINATORS_PARAM);
184        try {
185            getBaseEngine(jobId, getUser(request)).disableSLAAlert(jobId, actions, dates, childIds);
186        }
187        catch (BaseEngineException e) {
188            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
189        }
190    }
191
192    @Override
193    protected void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
194        String jobId = getResourceName(request);
195        String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST);
196        String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE);
197        String newParams = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
198        String coords = request.getParameter(RestConstants.COORDINATORS_PARAM);
199
200        try {
201            getBaseEngine(jobId, getUser(request)).changeSLA(jobId, actions, dates, coords, newParams);
202        }
203        catch (BaseEngineException e) {
204            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
205        }
206    }
207
208    /**
209     * Ignore a coordinator job/action
210     *
211     * @param request servlet request
212     * @param response servlet response
213     * @throws XServletException
214     */
215    @SuppressWarnings("unchecked")
216    private JSONObject ignoreCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
217            throws XServletException {
218        JSONObject json = null;
219        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
220                getUser(request));
221        String jobId = getResourceName(request);
222        String type = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM);
223        String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM);
224        String changeValue = "status=" + CoordinatorAction.Status.IGNORED;
225        List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>();
226        try {
227            if (type != null && !type.equals(RestConstants.JOB_COORD_SCOPE_ACTION)) {
228                throw new CommandException(ErrorCode.E1024, "Currently ignore only support -action option");
229            }
230            CoordinatorActionInfo coordInfo = null;
231            if(scope == null || scope.isEmpty()) {
232                coordEngine.change(jobId, changeValue);
233            } else{
234                coordInfo = coordEngine.ignore(jobId, type, scope);
235            }
236            if(coordInfo != null) {
237                coordActions = coordInfo.getCoordActions();
238                json = new JSONObject();
239                json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT"));
240            }
241            return json;
242        }
243        catch (CommandException ex) {
244            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
245        }
246        catch (CoordinatorEngineException ex) {
247            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
248        }
249    }
250
251    @Override
252    @SuppressWarnings("unchecked")
253    protected String getJobStatus(HttpServletRequest request, HttpServletResponse response) throws XServletException,
254            IOException {
255        String status;
256        String jobId = getResourceName(request);
257        try {
258            if (jobId.endsWith("-B") || jobId.endsWith("-W")) {
259                status = getBaseEngine(jobId, getUser(request)).getJobStatus(jobId);
260            }
261            else if (jobId.contains("C@")) {
262                CoordinatorEngine engine = Services.get().get(CoordinatorEngineService.class)
263                        .getCoordinatorEngine(getUser(request));
264                status = engine.getActionStatus(jobId);
265            }
266            else {
267                status = getBaseEngine(jobId, getUser(request)).getJobStatus(jobId);
268            }
269
270        } catch (BaseEngineException ex) {
271            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
272        }
273        return status;
274    }
275    @SuppressWarnings("unchecked")
276    @Override
277    protected void streamJobErrorLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
278            IOException {
279
280        String jobId = getResourceName(request);
281        try {
282            getBaseEngine(jobId, getUser(request)).streamErrorLog(jobId, response.getWriter(), request.getParameterMap());
283        }
284        catch (DagEngineException ex) {
285            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
286        }
287        catch (BaseEngineException e) {
288            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
289        }
290    }
291
292    @SuppressWarnings("unchecked")
293    @Override
294    protected void streamJobAuditLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
295            IOException {
296
297        String jobId = getResourceName(request);
298        try {
299            getBaseEngine(jobId, getUser(request)).streamAuditLog(jobId, response.getWriter(), request.getParameterMap());
300        }
301        catch (DagEngineException ex) {
302            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
303        }
304        catch (BaseEngineException e) {
305            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
306        }
307    }
308
309    @SuppressWarnings("unchecked")
310    @Override
311    JSONArray getActionRetries(HttpServletRequest request, HttpServletResponse response)
312            throws XServletException, IOException {
313        JSONArray jsonArray = new JSONArray();
314        String jobId = getResourceName(request);
315        try {
316            jsonArray.addAll(Services.get().get(DagEngineService.class).getDagEngine(getUser(request))
317                    .getWorkflowActionRetries(jobId));
318            return jsonArray;
319        }
320        catch (BaseEngineException ex) {
321            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
322        }
323    }
324
325    @SuppressWarnings("unchecked")
326    @Override
327    protected JSONObject getCoordActionMissingDependencies(HttpServletRequest request, HttpServletResponse response)
328            throws XServletException, IOException {
329        String jobId = getResourceName(request);
330        String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST);
331        String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE);
332
333        try {
334            List<Pair<CoordinatorActionBean, Map<String, ActionDependency>>> dependenciesList = Services.get()
335                    .get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request))
336                    .getCoordActionMissingDependencies(jobId, actions, dates);
337            JSONArray dependenciesArray = new JSONArray();
338            for (Pair<CoordinatorActionBean, Map<String, ActionDependency>> dependencies : dependenciesList) {
339                JSONObject json = new JSONObject();
340                JSONArray parentJsonArray = new JSONArray();
341
342                for (String key : dependencies.getSecond().keySet()) {
343                    JSONObject dependencyList = new JSONObject();
344                    JSONArray jsonArray = new JSONArray();
345                    jsonArray.addAll(dependencies.getSecond().get(key).getMissingDependencies());
346                    dependencyList.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, jsonArray);
347                    dependencyList.put(JsonTags.COORDINATOR_ACTION_DATASET, key);
348                    parentJsonArray.add(dependencyList);
349                }
350                json.put(JsonTags.COORD_ACTION_FIRST_MISSING_DEPENDENCIES,
351                        CoordCommandUtils.getFirstMissingDependency(dependencies.getFirst()));
352                json.put(JsonTags.COORDINATOR_ACTION_ID, dependencies.getFirst().getActionNumber());
353                json.put(JsonTags.COORDINATOR_ACTION_DATASETS, parentJsonArray);
354                dependenciesArray.add(json);
355            }
356            JSONObject jsonObject = new JSONObject();
357            jsonObject.put(JsonTags.COORD_ACTION_MISSING_DEPENDENCIES, dependenciesArray);
358            return jsonObject;
359        }
360        catch (CommandException e) {
361            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
362        }
363    }
364
365    /**
366     * Gets the base engine based on jobId.
367     *
368     * @param jobId the jobId
369     * @param user the user
370     * @return the baseEngine
371     */
372    final public BaseEngine getBaseEngine(String jobId, String user) {
373        if (jobId.endsWith("-W")) {
374            return Services.get().get(DagEngineService.class).getDagEngine(user);
375        }
376        else if (jobId.endsWith("-B")) {
377            return Services.get().get(BundleEngineService.class).getBundleEngine(user);
378        }
379        else if (jobId.contains("-C")) {
380            return Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(user);
381        }
382        else {
383            throw new RuntimeException("Unknown job Type");
384        }
385    }
386
387    @Override
388    protected JSONObject getWfActionByJobIdAndName(HttpServletRequest request, HttpServletResponse response)
389            throws XServletException, IOException {
390        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
391                getUser(request));
392        String jobId = getResourceName(request);
393        String action = request.getParameter(RestConstants.ACTION_NAME_PARAM);
394        String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
395        String lenStr = request.getParameter(RestConstants.LEN_PARAM);
396        String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM);
397        timeZoneId = (timeZoneId == null) ? "GMT" : timeZoneId;
398
399        if (action == null) {
400            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST,
401                    ErrorCode.E0305, RestConstants.ACTION_NAME_PARAM);
402        }
403
404        int offset = (startStr != null) ? Integer.parseInt(startStr) : 1;
405        offset = (offset < 1) ? 1 : offset;
406        /**
407         * set default number of wf actions to be retrieved to
408         * default number of coordinator actions to be retrieved
409         **/
410        int defaultLen = ConfigurationService.getInt(COORD_ACTIONS_DEFAULT_LENGTH);
411        int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
412        len = getCoordinatorJobLength(defaultLen, len);
413
414        try {
415            JSONObject json = new JSONObject();
416            List<CoordinatorWfActionBean> coordWfActions = coordEngine.getWfActionByJobIdAndName(jobId, action, offset, len);
417            JSONArray array = new JSONArray();
418            for (CoordinatorWfActionBean coordWfAction : coordWfActions) {
419                array.add(coordWfAction.toJSONObject(timeZoneId));
420            }
421            json.put(JsonTags.COORDINATOR_JOB_ID, jobId);
422            json.put(JsonTags.COORDINATOR_WF_ACTIONS, array);
423            return json;
424        }
425        catch (CoordinatorEngineException ex) {
426            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
427        }
428    }
429}