001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.servlet; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024 025import javax.servlet.http.HttpServletRequest; 026import javax.servlet.http.HttpServletResponse; 027 028import org.apache.commons.lang.StringUtils; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.oozie.BaseEngine; 031import org.apache.oozie.BaseEngineException; 032import org.apache.oozie.BundleEngine; 033import org.apache.oozie.CoordinatorActionBean; 034import org.apache.oozie.CoordinatorActionInfo; 035import org.apache.oozie.CoordinatorEngine; 036import org.apache.oozie.CoordinatorEngineException; 037import org.apache.oozie.DagEngine; 038import org.apache.oozie.DagEngineException; 039import org.apache.oozie.ErrorCode; 040import org.apache.oozie.client.CoordinatorAction; 041import org.apache.oozie.client.rest.JsonBean; 042import org.apache.oozie.client.rest.JsonTags; 043import org.apache.oozie.client.rest.RestConstants; 044import org.apache.oozie.command.CommandException; 045import org.apache.oozie.service.BundleEngineService; 046import org.apache.oozie.service.CoordinatorEngineService; 047import org.apache.oozie.service.DagEngineService; 048import org.apache.oozie.service.Services; 049import org.json.simple.JSONObject; 050 051@SuppressWarnings("serial") 052public class V2JobServlet extends V1JobServlet { 053 054 private static final String INSTRUMENTATION_NAME = "v2job"; 055 056 public V2JobServlet() { 057 super(INSTRUMENTATION_NAME); 058 } 059 060 @Override 061 protected JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 062 JsonBean jobBean = super.getWorkflowJobBean(request, response); 063 return jobBean; 064 } 065 066 @Override 067 protected JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response) throws XServletException { 068 JsonBean actionBean = super.getWorkflowActionBean(request, response); 069 return actionBean; 070 } 071 072 @Override 073 protected int getCoordinatorJobLength(int defaultLen, int len) { 074 return (len < 0) ? defaultLen : len; 075 } 076 077 @Override 078 protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException, 079 IOException { 080 String topicName; 081 String jobId = getResourceName(request); 082 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 083 try { 084 topicName = dagEngine.getJMSTopicName(jobId); 085 } 086 catch (DagEngineException ex) { 087 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 088 } 089 return topicName; 090 } 091 092 @Override 093 protected JSONObject getJobsByParentId(HttpServletRequest request, HttpServletResponse response) 094 throws XServletException, IOException { 095 return super.getJobsByParentId(request, response); 096 } 097 098 /** 099 * Update coord job. 100 * 101 * @param request the request 102 * @param response the response 103 * @return the JSON object 104 * @throws XServletException the x servlet exception 105 * @throws IOException Signals that an I/O exception has occurred. 106 */ 107 @SuppressWarnings("unchecked") 108 @Override 109 protected JSONObject updateJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 110 throws XServletException, IOException { 111 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class) 112 .getCoordinatorEngine(getUser(request)); 113 JSONObject json = new JSONObject(); 114 try { 115 String jobId = getResourceName(request); 116 boolean dryrun = StringUtils.isEmpty(request.getParameter(RestConstants.JOB_ACTION_DRYRUN)) ? false 117 : Boolean.parseBoolean(request.getParameter(RestConstants.JOB_ACTION_DRYRUN)); 118 boolean showDiff = StringUtils.isEmpty(request.getParameter(RestConstants.JOB_ACTION_SHOWDIFF)) ? true 119 : Boolean.parseBoolean(request.getParameter(RestConstants.JOB_ACTION_SHOWDIFF)); 120 121 String diff = coordEngine.updateJob(conf, jobId, dryrun, showDiff); 122 JSONObject diffJson = new JSONObject(); 123 diffJson.put(JsonTags.COORD_UPDATE_DIFF, diff); 124 json.put(JsonTags.COORD_UPDATE, diffJson); 125 } 126 catch (CoordinatorEngineException e) { 127 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); 128 } 129 return json; 130 } 131 132 /** 133 * Ignore a coordinator job 134 * @param request request object 135 * @param response response object 136 * @throws XServletException 137 * @throws IOException 138 */ 139 @Override 140 protected JSONObject ignoreJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { 141 String jobId = getResourceName(request); 142 if (jobId.endsWith("-W")) { 143 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Workflow Ignore Not supported"); 144 } else if (jobId.endsWith("-B")) { 145 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Bundle Ignore Not supported"); 146 } else { 147 return ignoreCoordinatorJob(request, response); 148 } 149 150 } 151 152 @Override 153 protected void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, 154 IOException { 155 String jobId = getResourceName(request); 156 String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST); 157 String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE); 158 String childIds = request.getParameter(RestConstants.COORDINATORS_PARAM); 159 try { 160 getBaseEngine(jobId, getUser(request)).enableSLAAlert(jobId, actions, dates, childIds); 161 } 162 catch (BaseEngineException e) { 163 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); 164 } 165 166 } 167 168 @Override 169 protected void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, 170 IOException { 171 String jobId = getResourceName(request); 172 String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST); 173 String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE); 174 String childIds = request.getParameter(RestConstants.COORDINATORS_PARAM); 175 try { 176 getBaseEngine(jobId, getUser(request)).disableSLAAlert(jobId, actions, dates, childIds); 177 } 178 catch (BaseEngineException e) { 179 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); 180 } 181 } 182 183 @Override 184 protected void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { 185 String jobId = getResourceName(request); 186 String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST); 187 String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE); 188 String newParams = request.getParameter(RestConstants.JOB_CHANGE_VALUE); 189 String coords = request.getParameter(RestConstants.COORDINATORS_PARAM); 190 191 try { 192 getBaseEngine(jobId, getUser(request)).changeSLA(jobId, actions, dates, coords, newParams); 193 } 194 catch (BaseEngineException e) { 195 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); 196 } 197 } 198 199 /** 200 * Ignore a coordinator job/action 201 * 202 * @param request servlet request 203 * @param response servlet response 204 * @throws XServletException 205 */ 206 @SuppressWarnings("unchecked") 207 private JSONObject ignoreCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 208 throws XServletException { 209 JSONObject json = null; 210 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 211 getUser(request)); 212 String jobId = getResourceName(request); 213 String type = request.getParameter(RestConstants.JOB_COORD_RANGE_TYPE_PARAM); 214 String scope = request.getParameter(RestConstants.JOB_COORD_SCOPE_PARAM); 215 String changeValue = "status=" + CoordinatorAction.Status.IGNORED; 216 List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>(); 217 try { 218 if (type != null && !type.equals(RestConstants.JOB_COORD_SCOPE_ACTION)) { 219 throw new CommandException(ErrorCode.E1024, "Currently ignore only support -action option"); 220 } 221 CoordinatorActionInfo coordInfo = null; 222 if(scope == null || scope.isEmpty()) { 223 coordEngine.change(jobId, changeValue); 224 } else{ 225 coordInfo = coordEngine.ignore(jobId, type, scope); 226 } 227 if(coordInfo != null) { 228 coordActions = coordInfo.getCoordActions(); 229 json = new JSONObject(); 230 json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT")); 231 } 232 return json; 233 } 234 catch (CommandException ex) { 235 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 236 } 237 catch (CoordinatorEngineException ex) { 238 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 239 } 240 } 241 242 @Override 243 @SuppressWarnings("unchecked") 244 protected String getJobStatus(HttpServletRequest request, HttpServletResponse response) throws XServletException, 245 IOException { 246 String status; 247 String jobId = getResourceName(request); 248 try { 249 if (jobId.endsWith("-B") || jobId.endsWith("-W")) { 250 status = getBaseEngine(jobId, getUser(request)).getJobStatus(jobId); 251 } 252 else if (jobId.contains("C@")) { 253 CoordinatorEngine engine = Services.get().get(CoordinatorEngineService.class) 254 .getCoordinatorEngine(getUser(request)); 255 status = engine.getActionStatus(jobId); 256 } 257 else { 258 status = getBaseEngine(jobId, getUser(request)).getJobStatus(jobId); 259 } 260 261 } catch (BaseEngineException ex) { 262 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 263 } 264 return status; 265 } 266 @SuppressWarnings("unchecked") 267 @Override 268 protected void streamJobErrorLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 269 IOException { 270 271 String jobId = getResourceName(request); 272 try { 273 getBaseEngine(jobId, getUser(request)).streamErrorLog(jobId, response.getWriter(), request.getParameterMap()); 274 } 275 catch (DagEngineException ex) { 276 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 277 } 278 catch (BaseEngineException e) { 279 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); 280 } 281 } 282 283 @SuppressWarnings("unchecked") 284 @Override 285 protected void streamJobAuditLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 286 IOException { 287 288 String jobId = getResourceName(request); 289 try { 290 getBaseEngine(jobId, getUser(request)).streamAuditLog(jobId, response.getWriter(), request.getParameterMap()); 291 } 292 catch (DagEngineException ex) { 293 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 294 } 295 catch (BaseEngineException e) { 296 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); 297 } 298 299 } 300 301 302 /** 303 * Gets the base engine based on jobId. 304 * 305 * @param jobId the jobId 306 * @param user the user 307 * @return the baseEngine 308 */ 309 final public BaseEngine getBaseEngine(String jobId, String user) { 310 if (jobId.endsWith("-W")) { 311 return Services.get().get(DagEngineService.class).getDagEngine(user); 312 } 313 else if (jobId.endsWith("-B")) { 314 return Services.get().get(BundleEngineService.class).getBundleEngine(user); 315 } 316 else if (jobId.contains("-C")) { 317 return Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(user); 318 } 319 else { 320 throw new RuntimeException("Unknown job Type"); 321 } 322 } 323 324}