001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.servlet; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025 026import javax.servlet.http.HttpServletRequest; 027import javax.servlet.http.HttpServletResponse; 028 029import org.apache.commons.lang.StringUtils; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.oozie.BaseEngine; 032import org.apache.oozie.BaseEngineException; 033import org.apache.oozie.BundleEngine; 034import org.apache.oozie.CoordinatorActionBean; 035import org.apache.oozie.CoordinatorActionInfo; 036import org.apache.oozie.CoordinatorEngine; 037import org.apache.oozie.CoordinatorEngineException; 038import org.apache.oozie.CoordinatorWfActionBean; 039import org.apache.oozie.WorkflowActionBean; 040import org.apache.oozie.DagEngine; 041import org.apache.oozie.DagEngineException; 042import org.apache.oozie.ErrorCode; 043import org.apache.oozie.client.CoordinatorAction; 044import org.apache.oozie.client.rest.JsonBean; 045import org.apache.oozie.client.rest.JsonTags; 046import org.apache.oozie.client.rest.RestConstants; 047import org.apache.oozie.command.CommandException; 048import org.apache.oozie.command.coord.CoordCommandUtils; 049import org.apache.oozie.command.wf.ActionXCommand; 050import org.apache.oozie.dependency.ActionDependency; 051import org.apache.oozie.service.BundleEngineService; 052import org.apache.oozie.service.CoordinatorEngineService; 053import org.apache.oozie.service.DagEngineService; 054import org.apache.oozie.service.Services; 055import org.apache.oozie.service.ConfigurationService; 056import org.apache.oozie.util.Pair; 057import org.json.simple.JSONArray; 058import org.json.simple.JSONObject; 059 060@SuppressWarnings("serial") 061public class V2JobServlet extends V1JobServlet { 062 063 private static final String INSTRUMENTATION_NAME = "v2job"; 064 065 public V2JobServlet() { 066 super(INSTRUMENTATION_NAME); 067 } 068 069 @Override 070 protected JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 071 JsonBean jobBean = super.getWorkflowJobBean(request, response); 072 return jobBean; 073 } 074 075 @Override 076 protected JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response) throws XServletException { 077 JsonBean actionBean = super.getWorkflowActionBean(request, response); 078 return actionBean; 079 } 080 081 @Override 082 protected int getCoordinatorJobLength(int defaultLen, int len) { 083 return (len < 0) ? defaultLen : len; 084 } 085 086 @Override 087 protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException, 088 IOException { 089 String topicName; 090 String jobId = getResourceName(request); 091 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 092 try { 093 topicName = dagEngine.getJMSTopicName(jobId); 094 } 095 catch (DagEngineException ex) { 096 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 097 } 098 return topicName; 099 } 100 101 @Override 102 protected JSONObject getJobsByParentId(HttpServletRequest request, HttpServletResponse response) 103 throws XServletException, IOException { 104 return super.getJobsByParentId(request, response); 105 } 106 107 /** 108 * Update coord job. 109 * 110 * @param request the request 111 * @param response the response 112 * @return the JSON object 113 * @throws XServletException the x servlet exception 114 * @throws IOException Signals that an I/O exception has occurred. 115 */ 116 @SuppressWarnings("unchecked") 117 @Override 118 protected JSONObject updateJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 119 throws XServletException, IOException { 120 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class) 121 .getCoordinatorEngine(getUser(request)); 122 JSONObject json = new JSONObject(); 123 try { 124 String jobId = getResourceName(request); 125 boolean dryrun = StringUtils.isEmpty(request.getParameter(RestConstants.JOB_ACTION_DRYRUN)) ? false 126 : Boolean.parseBoolean(request.getParameter(RestConstants.JOB_ACTION_DRYRUN)); 127 boolean showDiff = StringUtils.isEmpty(request.getParameter(RestConstants.JOB_ACTION_SHOWDIFF)) ? true 128 : Boolean.parseBoolean(request.getParameter(RestConstants.JOB_ACTION_SHOWDIFF)); 129 130 String diff = coordEngine.updateJob(conf, jobId, dryrun, showDiff); 131 JSONObject diffJson = new JSONObject(); 132 diffJson.put(JsonTags.COORD_UPDATE_DIFF, diff); 133 json.put(JsonTags.COORD_UPDATE, diffJson); 134 } 135 catch (CoordinatorEngineException e) { 136 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); 137 } 138 return json; 139 } 140 141 /** 142 * Ignore a coordinator job 143 * @param request request object 144 * @param response response object 145 * @throws XServletException 146 * @throws IOException 147 */ 148 @Override 149 protected JSONObject ignoreJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { 150 String jobId = getResourceName(request); 151 if (jobId.endsWith("-W")) { 152 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Workflow Ignore Not supported"); 153 } else if (jobId.endsWith("-B")) { 154 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Bundle Ignore Not supported"); 155 } else { 156 return ignoreCoordinatorJob(request, response); 157 } 158 159 } 160 161 @Override 162 protected void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, 163 IOException { 164 String jobId = getResourceName(request); 165 String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST); 166 String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE); 167 String childIds = request.getParameter(RestConstants.COORDINATORS_PARAM); 168 try { 169 getBaseEngine(jobId, getUser(request)).enableSLAAlert(jobId, actions, dates, childIds); 170 } 171 catch (BaseEngineException e) { 172 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); 173 } 174 175 } 176 177 @Override 178 protected void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, 179 IOException { 180 String jobId = getResourceName(request); 181 String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST); 182 String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE); 183 String childIds = request.getParameter(RestConstants.COORDINATORS_PARAM); 184 try { 185 getBaseEngine(jobId, getUser(request)).disableSLAAlert(jobId, actions, dates, childIds); 186 } 187 catch (BaseEngineException e) { 188 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); 189 } 190 } 191 192 @Override 193 protected void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { 194 String jobId = getResourceName(request); 195 String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST); 196 String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE); 197 String newParams = request.getParameter(RestConstants.JOB_CHANGE_VALUE); 198 String coords = request.getParameter(RestConstants.COORDINATORS_PARAM); 199 200 try { 201 getBaseEngine(jobId, getUser(request)).changeSLA(jobId, actions, dates, coords, newParams); 202 } 203 catch (BaseEngineException e) { 204 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); 205 } 206 } 207 208 /** 209 * Ignore a coordinator job/action 210 * 211 * @param request servlet request 212 * @param response servlet response 213 * @throws XServletException 214 */ 215 @SuppressWarnings("unchecked") 216 private JSONObject ignoreCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 217 throws XServletException { 218 JSONObject json = null; 219 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 220 getUser(request)); 221 String jobId = getResourceName(request); 222 String type = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM); 223 String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM); 224 String changeValue = "status=" + CoordinatorAction.Status.IGNORED; 225 List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>(); 226 try { 227 if (type != null && !type.equals(RestConstants.JOB_COORD_SCOPE_ACTION)) { 228 throw new CommandException(ErrorCode.E1024, "Currently ignore only support -action option"); 229 } 230 CoordinatorActionInfo coordInfo = null; 231 if(scope == null || scope.isEmpty()) { 232 coordEngine.change(jobId, changeValue); 233 } else{ 234 coordInfo = coordEngine.ignore(jobId, type, scope); 235 } 236 if(coordInfo != null) { 237 coordActions = coordInfo.getCoordActions(); 238 json = new JSONObject(); 239 json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT")); 240 } 241 return json; 242 } 243 catch (CommandException ex) { 244 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 245 } 246 catch (CoordinatorEngineException ex) { 247 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 248 } 249 } 250 251 @Override 252 @SuppressWarnings("unchecked") 253 protected String getJobStatus(HttpServletRequest request, HttpServletResponse response) throws XServletException, 254 IOException { 255 String status; 256 String jobId = getResourceName(request); 257 try { 258 if (jobId.endsWith("-B") || jobId.endsWith("-W")) { 259 status = getBaseEngine(jobId, getUser(request)).getJobStatus(jobId); 260 } 261 else if (jobId.contains("C@")) { 262 CoordinatorEngine engine = Services.get().get(CoordinatorEngineService.class) 263 .getCoordinatorEngine(getUser(request)); 264 status = engine.getActionStatus(jobId); 265 } 266 else { 267 status = getBaseEngine(jobId, getUser(request)).getJobStatus(jobId); 268 } 269 270 } catch (BaseEngineException ex) { 271 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 272 } 273 return status; 274 } 275 @SuppressWarnings("unchecked") 276 @Override 277 protected void streamJobErrorLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 278 IOException { 279 280 String jobId = getResourceName(request); 281 try { 282 getBaseEngine(jobId, getUser(request)).streamErrorLog(jobId, response.getWriter(), request.getParameterMap()); 283 } 284 catch (DagEngineException ex) { 285 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 286 } 287 catch (BaseEngineException e) { 288 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); 289 } 290 } 291 292 @SuppressWarnings("unchecked") 293 @Override 294 protected void streamJobAuditLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 295 IOException { 296 297 String jobId = getResourceName(request); 298 try { 299 getBaseEngine(jobId, getUser(request)).streamAuditLog(jobId, response.getWriter(), request.getParameterMap()); 300 } 301 catch (DagEngineException ex) { 302 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 303 } 304 catch (BaseEngineException e) { 305 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); 306 } 307 } 308 309 @SuppressWarnings("unchecked") 310 @Override 311 JSONArray getActionRetries(HttpServletRequest request, HttpServletResponse response) 312 throws XServletException, IOException { 313 JSONArray jsonArray = new JSONArray(); 314 String jobId = getResourceName(request); 315 try { 316 jsonArray.addAll(Services.get().get(DagEngineService.class).getDagEngine(getUser(request)) 317 .getWorkflowActionRetries(jobId)); 318 return jsonArray; 319 } 320 catch (BaseEngineException ex) { 321 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 322 } 323 } 324 325 @SuppressWarnings("unchecked") 326 @Override 327 protected JSONObject getCoordActionMissingDependencies(HttpServletRequest request, HttpServletResponse response) 328 throws XServletException, IOException { 329 String jobId = getResourceName(request); 330 String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST); 331 String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE); 332 333 try { 334 List<Pair<CoordinatorActionBean, Map<String, ActionDependency>>> dependenciesList = Services.get() 335 .get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request)) 336 .getCoordActionMissingDependencies(jobId, actions, dates); 337 JSONArray dependenciesArray = new JSONArray(); 338 for (Pair<CoordinatorActionBean, Map<String, ActionDependency>> dependencies : dependenciesList) { 339 JSONObject json = new JSONObject(); 340 JSONArray parentJsonArray = new JSONArray(); 341 342 for (String key : dependencies.getSecond().keySet()) { 343 JSONObject dependencyList = new JSONObject(); 344 JSONArray jsonArray = new JSONArray(); 345 jsonArray.addAll(dependencies.getSecond().get(key).getMissingDependencies()); 346 dependencyList.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, jsonArray); 347 dependencyList.put(JsonTags.COORDINATOR_ACTION_DATASET, key); 348 parentJsonArray.add(dependencyList); 349 } 350 json.put(JsonTags.COORD_ACTION_FIRST_MISSING_DEPENDENCIES, 351 CoordCommandUtils.getFirstMissingDependency(dependencies.getFirst())); 352 json.put(JsonTags.COORDINATOR_ACTION_ID, dependencies.getFirst().getActionNumber()); 353 json.put(JsonTags.COORDINATOR_ACTION_DATASETS, parentJsonArray); 354 dependenciesArray.add(json); 355 } 356 JSONObject jsonObject = new JSONObject(); 357 jsonObject.put(JsonTags.COORD_ACTION_MISSING_DEPENDENCIES, dependenciesArray); 358 return jsonObject; 359 } 360 catch (CommandException e) { 361 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); 362 } 363 } 364 365 /** 366 * Gets the base engine based on jobId. 367 * 368 * @param jobId the jobId 369 * @param user the user 370 * @return the baseEngine 371 */ 372 final public BaseEngine getBaseEngine(String jobId, String user) { 373 if (jobId.endsWith("-W")) { 374 return Services.get().get(DagEngineService.class).getDagEngine(user); 375 } 376 else if (jobId.endsWith("-B")) { 377 return Services.get().get(BundleEngineService.class).getBundleEngine(user); 378 } 379 else if (jobId.contains("-C")) { 380 return Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(user); 381 } 382 else { 383 throw new RuntimeException("Unknown job Type"); 384 } 385 } 386 387 @Override 388 protected JSONObject getWfActionByJobIdAndName(HttpServletRequest request, HttpServletResponse response) 389 throws XServletException, IOException { 390 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 391 getUser(request)); 392 String jobId = getResourceName(request); 393 String action = request.getParameter(RestConstants.ACTION_NAME_PARAM); 394 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 395 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 396 String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM); 397 timeZoneId = (timeZoneId == null) ? "GMT" : timeZoneId; 398 399 if (action == null) { 400 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, 401 ErrorCode.E0305, RestConstants.ACTION_NAME_PARAM); 402 } 403 404 int offset = (startStr != null) ? Integer.parseInt(startStr) : 1; 405 offset = (offset < 1) ? 1 : offset; 406 /** 407 * set default number of wf actions to be retrieved to 408 * default number of coordinator actions to be retrieved 409 **/ 410 int defaultLen = ConfigurationService.getInt(COORD_ACTIONS_DEFAULT_LENGTH); 411 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0; 412 len = getCoordinatorJobLength(defaultLen, len); 413 414 try { 415 JSONObject json = new JSONObject(); 416 List<CoordinatorWfActionBean> coordWfActions = coordEngine.getWfActionByJobIdAndName(jobId, action, offset, len); 417 JSONArray array = new JSONArray(); 418 for (CoordinatorWfActionBean coordWfAction : coordWfActions) { 419 array.add(coordWfAction.toJSONObject(timeZoneId)); 420 } 421 json.put(JsonTags.COORDINATOR_JOB_ID, jobId); 422 json.put(JsonTags.COORDINATOR_WF_ACTIONS, array); 423 return json; 424 } 425 catch (CoordinatorEngineException ex) { 426 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 427 } 428 } 429}