001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.servlet;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024
025import javax.servlet.http.HttpServletRequest;
026import javax.servlet.http.HttpServletResponse;
027
028import org.apache.commons.lang.StringUtils;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.oozie.BaseEngine;
031import org.apache.oozie.BaseEngineException;
032import org.apache.oozie.BundleEngine;
033import org.apache.oozie.CoordinatorActionBean;
034import org.apache.oozie.CoordinatorActionInfo;
035import org.apache.oozie.CoordinatorEngine;
036import org.apache.oozie.CoordinatorEngineException;
037import org.apache.oozie.DagEngine;
038import org.apache.oozie.DagEngineException;
039import org.apache.oozie.ErrorCode;
040import org.apache.oozie.client.CoordinatorAction;
041import org.apache.oozie.client.rest.JsonBean;
042import org.apache.oozie.client.rest.JsonTags;
043import org.apache.oozie.client.rest.RestConstants;
044import org.apache.oozie.command.CommandException;
045import org.apache.oozie.service.BundleEngineService;
046import org.apache.oozie.service.CoordinatorEngineService;
047import org.apache.oozie.service.DagEngineService;
048import org.apache.oozie.service.Services;
049import org.json.simple.JSONObject;
050
051@SuppressWarnings("serial")
052public class V2JobServlet extends V1JobServlet {
053
054    private static final String INSTRUMENTATION_NAME = "v2job";
055
056    public V2JobServlet() {
057        super(INSTRUMENTATION_NAME);
058    }
059
060    @Override
061    protected JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
062        JsonBean jobBean = super.getWorkflowJobBean(request, response);
063        return jobBean;
064    }
065
066    @Override
067    protected JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response) throws XServletException {
068        JsonBean actionBean = super.getWorkflowActionBean(request, response);
069        return actionBean;
070    }
071
072    @Override
073    protected int getCoordinatorJobLength(int defaultLen, int len) {
074        return (len < 0) ? defaultLen : len;
075    }
076
077    @Override
078    protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException,
079            IOException {
080        String topicName;
081        String jobId = getResourceName(request);
082        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
083        try {
084            topicName = dagEngine.getJMSTopicName(jobId);
085        }
086        catch (DagEngineException ex) {
087            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
088        }
089        return topicName;
090    }
091
092    @Override
093    protected JSONObject getJobsByParentId(HttpServletRequest request, HttpServletResponse response)
094            throws XServletException, IOException {
095        return super.getJobsByParentId(request, response);
096    }
097
098    /**
099     * Update coord job.
100     *
101     * @param request the request
102     * @param response the response
103     * @return the JSON object
104     * @throws XServletException the x servlet exception
105     * @throws IOException Signals that an I/O exception has occurred.
106     */
107    @SuppressWarnings("unchecked")
108    @Override
109    protected JSONObject updateJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
110            throws XServletException, IOException {
111        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class)
112                .getCoordinatorEngine(getUser(request));
113        JSONObject json = new JSONObject();
114        try {
115            String jobId = getResourceName(request);
116            boolean dryrun = StringUtils.isEmpty(request.getParameter(RestConstants.JOB_ACTION_DRYRUN)) ? false
117                    : Boolean.parseBoolean(request.getParameter(RestConstants.JOB_ACTION_DRYRUN));
118            boolean showDiff = StringUtils.isEmpty(request.getParameter(RestConstants.JOB_ACTION_SHOWDIFF)) ? true
119                    : Boolean.parseBoolean(request.getParameter(RestConstants.JOB_ACTION_SHOWDIFF));
120
121            String diff = coordEngine.updateJob(conf, jobId, dryrun, showDiff);
122            JSONObject diffJson = new JSONObject();
123            diffJson.put(JsonTags.COORD_UPDATE_DIFF, diff);
124            json.put(JsonTags.COORD_UPDATE, diffJson);
125        }
126        catch (CoordinatorEngineException e) {
127            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
128        }
129        return json;
130    }
131
132    /**
133     * Ignore a coordinator job
134     * @param request request object
135     * @param response response object
136     * @throws XServletException
137     * @throws IOException
138     */
139    @Override
140    protected JSONObject ignoreJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
141        String jobId = getResourceName(request);
142        if (jobId.endsWith("-W")) {
143            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Workflow Ignore Not supported");
144        } else if (jobId.endsWith("-B")) {
145            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Bundle Ignore Not supported");
146        } else {
147            return ignoreCoordinatorJob(request, response);
148        }
149
150    }
151
152    @Override
153    protected void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException,
154            IOException {
155        String jobId = getResourceName(request);
156        String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST);
157        String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE);
158        String childIds = request.getParameter(RestConstants.COORDINATORS_PARAM);
159        try {
160            getBaseEngine(jobId, getUser(request)).enableSLAAlert(jobId, actions, dates, childIds);
161        }
162        catch (BaseEngineException e) {
163            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
164        }
165
166    }
167
168    @Override
169    protected void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException,
170            IOException {
171        String jobId = getResourceName(request);
172        String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST);
173        String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE);
174        String childIds = request.getParameter(RestConstants.COORDINATORS_PARAM);
175        try {
176            getBaseEngine(jobId, getUser(request)).disableSLAAlert(jobId, actions, dates, childIds);
177        }
178        catch (BaseEngineException e) {
179            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
180        }
181    }
182
183    @Override
184    protected void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException {
185        String jobId = getResourceName(request);
186        String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST);
187        String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE);
188        String newParams = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
189        String coords = request.getParameter(RestConstants.COORDINATORS_PARAM);
190
191        try {
192            getBaseEngine(jobId, getUser(request)).changeSLA(jobId, actions, dates, coords, newParams);
193        }
194        catch (BaseEngineException e) {
195            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
196        }
197    }
198
199    /**
200     * Ignore a coordinator job/action
201     *
202     * @param request servlet request
203     * @param response servlet response
204     * @throws XServletException
205     */
206    @SuppressWarnings("unchecked")
207    private JSONObject ignoreCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
208            throws XServletException {
209        JSONObject json = null;
210        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
211                getUser(request));
212        String jobId = getResourceName(request);
213        String type = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM);
214        String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM);
215        String changeValue = "status=" + CoordinatorAction.Status.IGNORED;
216        List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>();
217        try {
218            if (type != null && !type.equals(RestConstants.JOB_COORD_SCOPE_ACTION)) {
219                throw new CommandException(ErrorCode.E1024, "Currently ignore only support -action option");
220            }
221            CoordinatorActionInfo coordInfo = null;
222            if(scope == null || scope.isEmpty()) {
223                coordEngine.change(jobId, changeValue);
224            } else{
225                coordInfo = coordEngine.ignore(jobId, type, scope);
226            }
227            if(coordInfo != null) {
228                coordActions = coordInfo.getCoordActions();
229                json = new JSONObject();
230                json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT"));
231            }
232            return json;
233        }
234        catch (CommandException ex) {
235            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
236        }
237        catch (CoordinatorEngineException ex) {
238            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
239        }
240    }
241
242    @Override
243    @SuppressWarnings("unchecked")
244    protected String getJobStatus(HttpServletRequest request, HttpServletResponse response) throws XServletException,
245            IOException {
246        String status;
247        String jobId = getResourceName(request);
248        try {
249            if (jobId.endsWith("-B") || jobId.endsWith("-W")) {
250                status = getBaseEngine(jobId, getUser(request)).getJobStatus(jobId);
251            }
252            else if (jobId.contains("C@")) {
253                CoordinatorEngine engine = Services.get().get(CoordinatorEngineService.class)
254                        .getCoordinatorEngine(getUser(request));
255                status = engine.getActionStatus(jobId);
256            }
257            else {
258                status = getBaseEngine(jobId, getUser(request)).getJobStatus(jobId);
259            }
260
261        } catch (BaseEngineException ex) {
262            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
263        }
264        return status;
265    }
266    @SuppressWarnings("unchecked")
267    @Override
268    protected void streamJobErrorLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
269            IOException {
270
271        String jobId = getResourceName(request);
272        try {
273            getBaseEngine(jobId, getUser(request)).streamErrorLog(jobId, response.getWriter(), request.getParameterMap());
274        }
275        catch (DagEngineException ex) {
276            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
277        }
278        catch (BaseEngineException e) {
279            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
280        }
281    }
282
283    @SuppressWarnings("unchecked")
284    @Override
285    protected void streamJobAuditLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
286            IOException {
287
288        String jobId = getResourceName(request);
289        try {
290            getBaseEngine(jobId, getUser(request)).streamAuditLog(jobId, response.getWriter(), request.getParameterMap());
291        }
292        catch (DagEngineException ex) {
293            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
294        }
295        catch (BaseEngineException e) {
296            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
297        }
298
299    }
300
301
302    /**
303     * Gets the base engine based on jobId.
304     *
305     * @param jobId the jobId
306     * @param user the user
307     * @return the baseEngine
308     */
309    final public BaseEngine getBaseEngine(String jobId, String user) {
310        if (jobId.endsWith("-W")) {
311            return Services.get().get(DagEngineService.class).getDagEngine(user);
312        }
313        else if (jobId.endsWith("-B")) {
314            return Services.get().get(BundleEngineService.class).getBundleEngine(user);
315        }
316        else if (jobId.contains("-C")) {
317            return Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(user);
318        }
319        else {
320            throw new RuntimeException("Unknown job Type");
321        }
322    }
323
324}