001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.client; 020 021import java.io.BufferedReader; 022import java.io.File; 023import java.io.FileReader; 024import java.io.IOException; 025import java.io.InputStreamReader; 026import java.net.HttpURLConnection; 027import java.util.Properties; 028 029import org.apache.oozie.cli.OozieCLI; 030import org.apache.oozie.client.rest.JsonTags; 031import org.apache.oozie.client.rest.RestConstants; 032import org.json.simple.JSONObject; 033import org.json.simple.JSONValue; 034 035public class XOozieClient extends OozieClient { 036 037 public static final String JT = "mapred.job.tracker"; 038 public static final String JT_2 = "mapreduce.jobtracker.address"; 039 040 public static final String NN = "fs.default.name"; 041 public static final String NN_2 = "fs.defaultFS"; 042 043 @Deprecated 044 public static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal"; 045 046 @Deprecated 047 public static final String NN_PRINCIPAL = "dfs.namenode.kerberos.principal"; 048 049 public static final String PIG_SCRIPT = "oozie.pig.script"; 050 051 public static final String PIG_OPTIONS = "oozie.pig.options"; 052 053 public static final String PIG_SCRIPT_PARAMS = "oozie.pig.script.params"; 054 055 public static final String HIVE_SCRIPT = "oozie.hive.script"; 056 057 public static final String HIVE_OPTIONS = "oozie.hive.options"; 058 059 public static final String HIVE_SCRIPT_PARAMS = "oozie.hive.script.params"; 060 061 public static final String SQOOP_COMMAND = "oozie.sqoop.command"; 062 063 public static final String SQOOP_OPTIONS = "oozie.sqoop.options"; 064 065 public static final String FILES = "oozie.files"; 066 067 public static final String ARCHIVES = "oozie.archives"; 068 069 public static final String IS_PROXY_SUBMISSION = "oozie.proxysubmission"; 070 071 protected XOozieClient() { 072 } 073 074 /** 075 * Create an eXtended Workflow client instance. 076 * 077 * @param oozieUrl URL of the Oozie instance it will interact with. 078 */ 079 public XOozieClient(String oozieUrl) { 080 super(oozieUrl); 081 } 082 083 private String readScript(String script) throws IOException { 084 if (!new File(script).exists()) { 085 throw new IOException("Error: script file [" + script + "] does not exist"); 086 } 087 088 BufferedReader br = null; 089 try { 090 br = new BufferedReader(new FileReader(script)); 091 StringBuilder sb = new StringBuilder(); 092 String line; 093 while ((line = br.readLine()) != null) { 094 sb.append(line + "\n"); 095 } 096 return sb.toString(); 097 } 098 finally { 099 try { 100 br.close(); 101 } 102 catch (IOException ex) { 103 System.err.println("Error: " + ex.getMessage()); 104 } 105 } 106 } 107 108 private String serializeSqoopCommand(String[] command) { 109 StringBuilder sb = new StringBuilder(); 110 for (String arg : command) { 111 sb.append(arg).append("\n"); 112 } 113 return sb.toString(); 114 } 115 116 static void setStrings(Properties conf, String key, String[] values) { 117 if (values != null) { 118 conf.setProperty(key + ".size", (new Integer(values.length)).toString()); 119 for (int i = 0; i < values.length; i++) { 120 conf.setProperty(key + "." + i, values[i]); 121 } 122 } 123 } 124 125 private void validateHttpSubmitConf(Properties conf) { 126 String JT = conf.getProperty(XOozieClient.JT); 127 String JT_2 = conf.getProperty(XOozieClient.JT_2); 128 if (JT == null) { 129 if(JT_2 == null) { 130 throw new RuntimeException("jobtracker is not specified in conf"); 131 } 132 } 133 134 String NN = conf.getProperty(XOozieClient.NN); 135 String NN_2 = conf.getProperty(XOozieClient.NN_2); 136 if (NN == null) { 137 if(NN_2 == null) { 138 throw new RuntimeException("namenode is not specified in conf"); 139 } else { 140 NN = NN_2; 141 } 142 } 143 144 String libPath = conf.getProperty(LIBPATH); 145 if (libPath == null) { 146 throw new RuntimeException("libpath is not specified in conf"); 147 } 148 if (!libPath.contains(":/")) { 149 String newLibPath; 150 if (libPath.startsWith("/")) { 151 if(NN.endsWith("/")) { 152 newLibPath = NN + libPath.substring(1); 153 } else { 154 newLibPath = NN + libPath; 155 } 156 } else { 157 throw new RuntimeException("libpath should be absolute"); 158 } 159 conf.setProperty(LIBPATH, newLibPath); 160 } 161 162 conf.setProperty(IS_PROXY_SUBMISSION, "true"); 163 } 164 165 /** 166 * Submit a Pig job via HTTP. 167 * 168 * @param conf job configuration. 169 * @param pigScriptFile pig script file. 170 * @param pigArgs pig arguments string. 171 * @return the job Id. 172 * @throws OozieClientException thrown if the job could not be submitted. 173 */ 174 @Deprecated 175 public String submitPig(Properties conf, String pigScriptFile, String[] pigArgs) throws IOException, OozieClientException { 176 return submitScriptLanguage(conf, pigScriptFile, pigArgs, OozieCLI.PIG_CMD); 177 } 178 179 /** 180 * Submit a Pig or Hive job via HTTP. 181 * 182 * @param conf job configuration. 183 * @param scriptFile script file. 184 * @param args arguments string. 185 * @return the job Id. 186 * @throws OozieClientException thrown if the job could not be submitted. 187 */ 188 public String submitScriptLanguage(Properties conf, String scriptFile, String[] args, String jobType) 189 throws IOException, OozieClientException { 190 return submitScriptLanguage(conf, scriptFile, args, null, jobType); 191 } 192 193 /** 194 * Submit a Pig or Hive job via HTTP. 195 * 196 * @param conf job configuration. 197 * @param scriptFile script file. 198 * @param args arguments string. 199 * @param params parameters string. 200 * @return the job Id. 201 * @throws OozieClientException thrown if the job could not be submitted. 202 */ 203 public String submitScriptLanguage(Properties conf, String scriptFile, String[] args, String[] params, String jobType) 204 throws IOException, OozieClientException { 205 OozieClient.notNull(conf, "conf"); 206 OozieClient.notNull(scriptFile, "scriptFile"); 207 validateHttpSubmitConf(conf); 208 209 String script = ""; 210 String options = ""; 211 String scriptParams = ""; 212 213 if (jobType.equals(OozieCLI.HIVE_CMD)) { 214 script = XOozieClient.HIVE_SCRIPT; 215 options = XOozieClient.HIVE_OPTIONS; 216 scriptParams = XOozieClient.HIVE_SCRIPT_PARAMS; 217 } 218 else if (jobType.equals(OozieCLI.PIG_CMD)) { 219 script = XOozieClient.PIG_SCRIPT; 220 options = XOozieClient.PIG_OPTIONS; 221 scriptParams = XOozieClient.PIG_SCRIPT_PARAMS; 222 } 223 else { 224 throw new IllegalArgumentException("jobType must be either pig or hive"); 225 } 226 227 conf.setProperty(script, readScript(scriptFile)); 228 setStrings(conf, options, args); 229 setStrings(conf, scriptParams, params); 230 231 return (new HttpJobSubmit(conf, jobType)).call(); 232 } 233 234 /** 235 * Submit a Sqoop job via HTTP. 236 * 237 * @param conf job configuration. 238 * @param command sqoop command to run. 239 * @param args arguments string. 240 * @return the job Id. 241 * @throws OozieClientException thrown if the job could not be submitted. 242 */ 243 public String submitSqoop(Properties conf, String[] command, String[] args) 244 throws OozieClientException { 245 OozieClient.notNull(conf, "conf"); 246 OozieClient.notNull(command, "command"); 247 validateHttpSubmitConf(conf); 248 249 conf.setProperty(XOozieClient.SQOOP_COMMAND, serializeSqoopCommand(command)); 250 setStrings(conf, XOozieClient.SQOOP_OPTIONS, args); 251 252 return (new HttpJobSubmit(conf, OozieCLI.SQOOP_CMD)).call(); 253 } 254 255 /** 256 * Submit a Map/Reduce job via HTTP. 257 * 258 * @param conf job configuration. 259 * @return the job Id. 260 * @throws OozieClientException thrown if the job could not be submitted. 261 */ 262 public String submitMapReduce(Properties conf) throws OozieClientException { 263 OozieClient.notNull(conf, "conf"); 264 validateHttpSubmitConf(conf); 265 266 return (new HttpJobSubmit(conf, "mapreduce")).call(); 267 } 268 269 private class HttpJobSubmit extends ClientCallable<String> { 270 private Properties conf; 271 272 HttpJobSubmit(Properties conf, String jobType) { 273 super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, jobType)); 274 this.conf = notNull(conf, "conf"); 275 } 276 277 @Override 278 protected String call(HttpURLConnection conn) throws IOException, OozieClientException { 279 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 280 writeToXml(conf, conn.getOutputStream()); 281 if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) { 282 JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream())); 283 return (String) json.get(JsonTags.JOB_ID); 284 } 285 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { 286 handleError(conn); 287 } 288 return null; 289 } 290 } 291 292 /** 293 * set LIBPATH for HTTP submission job. 294 * 295 * @param conf Configuration object. 296 * @param pathStr lib HDFS path. 297 */ 298 public void setLib(Properties conf, String pathStr) { 299 conf.setProperty(LIBPATH, pathStr); 300 } 301 302 /** 303 * The equivalent to <file> tag in oozie's workflow xml. 304 * 305 * @param conf Configuration object. 306 * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name. 307 * For example, "/user/oozie/parameter_file#myparams". If no "#..." is specified, file name will be used as 308 * symbolic link name. 309 */ 310 public void addFile(Properties conf, String file) { 311 OozieClient.notEmpty(file, "file"); 312 String files = conf.getProperty(FILES); 313 conf.setProperty(FILES, files == null ? file : files + "," + file); 314 } 315 316 /** 317 * The equivalent to <archive> tag in oozie's workflow xml. 318 * 319 * @param conf Configuration object. 320 * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name. 321 * For example, "/user/oozie/udf1.jar#my.jar". If no "#..." is specified, file name will be used as 322 * symbolic link name. 323 */ 324 public void addArchive(Properties conf, String file) { 325 OozieClient.notEmpty(file, "file"); 326 String files = conf.getProperty(ARCHIVES); 327 conf.setProperty(ARCHIVES, files == null ? file : files + "," + file); 328 } 329}