001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.servlet; 019 020import java.io.IOException; 021 022import javax.servlet.http.HttpServletRequest; 023import javax.servlet.http.HttpServletResponse; 024 025import org.apache.hadoop.conf.Configuration; 026import org.apache.oozie.DagEngine; 027import org.apache.oozie.DagEngineException; 028import org.apache.oozie.client.rest.JsonBean; 029import org.apache.oozie.service.DagEngineService; 030import org.apache.oozie.service.Services; 031import org.json.simple.JSONObject; 032import org.apache.oozie.ErrorCode; 033 034@SuppressWarnings("serial") 035public class V0JobServlet extends BaseJobServlet { 036 037 private static final String INSTRUMENTATION_NAME = "v0job"; 038 039 public V0JobServlet() { 040 super(INSTRUMENTATION_NAME); 041 } 042 043 /* 044 * v0 service method to start a job 045 */ 046 @Override 047 protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 048 IOException { 049 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 050 051 String jobId = getResourceName(request); 052 try { 053 dagEngine.start(jobId); 054 } 055 catch (DagEngineException ex) { 056 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 057 } 058 } 059 060 /* 061 * v0 service method to resume a job 062 */ 063 @Override 064 protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 065 IOException { 066 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 067 068 String jobId = getResourceName(request); 069 try { 070 dagEngine.resume(jobId); 071 } 072 catch (DagEngineException ex) { 073 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 074 } 075 } 076 077 /* 078 * v0 service method to suspend a job 079 */ 080 @Override 081 protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 082 IOException { 083 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 084 085 String jobId = getResourceName(request); 086 try { 087 dagEngine.suspend(jobId); 088 } 089 catch (DagEngineException ex) { 090 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 091 } 092 } 093 094 /* 095 * v0 service method to kill a job 096 */ 097 @Override 098 protected JSONObject killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 099 IOException { 100 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 101 102 String jobId = getResourceName(request); 103 try { 104 dagEngine.kill(jobId); 105 } 106 catch (DagEngineException ex) { 107 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 108 } 109 return null; 110 } 111 112 /* 113 * v0 service method to change a job 114 */ 115 protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 116 IOException { 117 // This code should not be reached. But if it happens somehow, we throw 118 // bad request exception. 119 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E1014); 120 } 121 122 /* 123 * v0 service method to reRun a job 124 */ 125 @Override 126 protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 127 throws XServletException, IOException { 128 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 129 130 String jobId = getResourceName(request); 131 try { 132 dagEngine.reRun(jobId, conf); 133 } 134 catch (DagEngineException ex) { 135 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 136 } 137 return null; 138 } 139 140 /* 141 * v0 service method to get a job in JsonBean representation 142 */ 143 @Override 144 protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 145 IOException { 146 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 147 148 JsonBean jobBean = null; 149 String jobId = getResourceName(request); 150 try { 151 jobBean = (JsonBean) dagEngine.getJob(jobId); 152 } 153 catch (DagEngineException ex) { 154 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 155 } 156 157 return jobBean; 158 } 159 160 /* 161 * v0 service method to get a job definition in String format 162 */ 163 @Override 164 protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response) 165 throws XServletException, IOException { 166 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 167 168 String wfDefinition = null; 169 String jobId = getResourceName(request); 170 try { 171 wfDefinition = dagEngine.getDefinition(jobId); 172 } 173 catch (DagEngineException ex) { 174 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 175 } 176 return wfDefinition; 177 } 178 179 /* 180 * v0 service method to stream a job log into response object 181 */ 182 @Override 183 protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 184 IOException { 185 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 186 187 String jobId = getResourceName(request); 188 try { 189 dagEngine.streamLog(jobId, response.getWriter(), request.getParameterMap()); 190 } 191 catch (DagEngineException ex) { 192 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 193 } 194 } 195 196 /* 197 * Not implemented in v0 198 */ 199 @Override 200 protected void streamJobGraph(HttpServletRequest request, HttpServletResponse response) 201 throws XServletException, IOException { 202 // Should this error code be NOT_IMPLEMENTED? 203 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306); 204 } 205 206 @Override 207 protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException, 208 IOException { 209 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306); 210 } 211 212 @Override 213 protected JSONObject getJobsByParentId(HttpServletRequest request, HttpServletResponse response) throws XServletException, 214 IOException { 215 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0"); 216 } 217 218 @Override 219 protected JSONObject updateJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 220 throws XServletException, IOException { 221 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0"); 222 } 223 224 @Override 225 protected JSONObject ignoreJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { 226 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0"); 227 } 228}