001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.servlet; 019 020 import java.io.IOException; 021 022 import javax.servlet.http.HttpServletRequest; 023 import javax.servlet.http.HttpServletResponse; 024 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.oozie.DagEngine; 027 import org.apache.oozie.DagEngineException; 028 import org.apache.oozie.client.rest.JsonBean; 029 import org.apache.oozie.service.DagEngineService; 030 import org.apache.oozie.service.Services; 031 import org.json.simple.JSONObject; 032 import org.apache.oozie.ErrorCode; 033 034 @SuppressWarnings("serial") 035 public class V0JobServlet extends BaseJobServlet { 036 037 private static final String INSTRUMENTATION_NAME = "v0job"; 038 039 public V0JobServlet() { 040 super(INSTRUMENTATION_NAME); 041 } 042 043 /* 044 * v0 service method to start a job 045 */ 046 @Override 047 protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 048 IOException { 049 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 050 getAuthToken(request)); 051 052 String jobId = getResourceName(request); 053 try { 054 dagEngine.start(jobId); 055 } 056 catch (DagEngineException ex) { 057 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 058 } 059 } 060 061 /* 062 * v0 service method to resume a job 063 */ 064 @Override 065 protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 066 IOException { 067 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 068 getAuthToken(request)); 069 070 String jobId = getResourceName(request); 071 try { 072 dagEngine.resume(jobId); 073 } 074 catch (DagEngineException ex) { 075 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 076 } 077 } 078 079 /* 080 * v0 service method to suspend a job 081 */ 082 @Override 083 protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 084 IOException { 085 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 086 getAuthToken(request)); 087 088 String jobId = getResourceName(request); 089 try { 090 dagEngine.suspend(jobId); 091 } 092 catch (DagEngineException ex) { 093 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 094 } 095 } 096 097 /* 098 * v0 service method to kill a job 099 */ 100 @Override 101 protected void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 102 IOException { 103 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 104 getAuthToken(request)); 105 106 String jobId = getResourceName(request); 107 try { 108 dagEngine.kill(jobId); 109 } 110 catch (DagEngineException ex) { 111 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 112 } 113 } 114 115 /* 116 * v0 service method to change a job 117 */ 118 protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 119 IOException { 120 // This code should not be reached. But if it happens somehow, we throw 121 // bad request exception. 122 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E1014); 123 } 124 125 /* 126 * v0 service method to reRun a job 127 */ 128 @Override 129 protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 130 throws XServletException, IOException { 131 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 132 getAuthToken(request)); 133 134 String jobId = getResourceName(request); 135 try { 136 dagEngine.reRun(jobId, conf); 137 } 138 catch (DagEngineException ex) { 139 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 140 } 141 return null; 142 } 143 144 /* 145 * v0 service method to get a job in JsonBean representation 146 */ 147 @Override 148 protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 149 IOException { 150 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 151 getAuthToken(request)); 152 153 JsonBean jobBean = null; 154 String jobId = getResourceName(request); 155 try { 156 jobBean = (JsonBean) dagEngine.getJob(jobId); 157 } 158 catch (DagEngineException ex) { 159 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 160 } 161 162 return jobBean; 163 } 164 165 /* 166 * v0 service method to get a job definition in String format 167 */ 168 @Override 169 protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response) 170 throws XServletException, IOException { 171 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 172 getAuthToken(request)); 173 174 String wfDefinition = null; 175 String jobId = getResourceName(request); 176 try { 177 wfDefinition = dagEngine.getDefinition(jobId); 178 } 179 catch (DagEngineException ex) { 180 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 181 } 182 return wfDefinition; 183 } 184 185 /* 186 * v0 service method to stream a job log into response object 187 */ 188 @Override 189 protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 190 IOException { 191 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request), 192 getAuthToken(request)); 193 194 String jobId = getResourceName(request); 195 try { 196 dagEngine.streamLog(jobId, response.getWriter()); 197 } 198 catch (DagEngineException ex) { 199 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 200 } 201 } 202 203 /* 204 * Not implemented in v0 205 */ 206 @Override 207 protected void streamJobGraph(HttpServletRequest request, HttpServletResponse response) 208 throws XServletException, IOException { 209 // Should this error code be NOT_IMPLEMENTED? 210 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306); 211 } 212 }