001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.client; 019 020import java.io.BufferedReader; 021import java.io.File; 022import java.io.FileReader; 023import java.io.IOException; 024import java.io.InputStreamReader; 025import java.net.HttpURLConnection; 026import java.util.Properties; 027 028import org.apache.oozie.cli.OozieCLI; 029import org.apache.oozie.client.rest.JsonTags; 030import org.apache.oozie.client.rest.RestConstants; 031import org.json.simple.JSONObject; 032import org.json.simple.JSONValue; 033 034public class XOozieClient extends OozieClient { 035 036 public static final String JT = "mapred.job.tracker"; 037 public static final String JT_2 = "mapreduce.jobtracker.address"; 038 039 public static final String NN = "fs.default.name"; 040 public static final String NN_2 = "fs.defaultFS"; 041 042 @Deprecated 043 public static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal"; 044 045 @Deprecated 046 public static final String NN_PRINCIPAL = "dfs.namenode.kerberos.principal"; 047 048 public static final String PIG_SCRIPT = "oozie.pig.script"; 049 050 public static final String PIG_OPTIONS = "oozie.pig.options"; 051 052 public static final String PIG_SCRIPT_PARAMS = "oozie.pig.script.params"; 053 054 public static final String HIVE_SCRIPT = "oozie.hive.script"; 055 056 public static final String HIVE_OPTIONS = "oozie.hive.options"; 057 058 public static final String HIVE_SCRIPT_PARAMS = "oozie.hive.script.params"; 059 060 public static final String SQOOP_COMMAND = "oozie.sqoop.command"; 061 062 public static final String SQOOP_OPTIONS = "oozie.sqoop.options"; 063 064 public static final String FILES = "oozie.files"; 065 066 public static final String ARCHIVES = "oozie.archives"; 067 068 public static final String IS_PROXY_SUBMISSION = "oozie.proxysubmission"; 069 070 protected XOozieClient() { 071 } 072 073 /** 074 * Create an eXtended Workflow client instance. 075 * 076 * @param oozieUrl URL of the Oozie instance it will interact with. 077 */ 078 public XOozieClient(String oozieUrl) { 079 super(oozieUrl); 080 } 081 082 private String readScript(String script) throws IOException { 083 if (!new File(script).exists()) { 084 throw new IOException("Error: script file [" + script + "] does not exist"); 085 } 086 087 BufferedReader br = null; 088 try { 089 br = new BufferedReader(new FileReader(script)); 090 StringBuilder sb = new StringBuilder(); 091 String line; 092 while ((line = br.readLine()) != null) { 093 sb.append(line + "\n"); 094 } 095 return sb.toString(); 096 } 097 finally { 098 try { 099 br.close(); 100 } 101 catch (IOException ex) { 102 System.err.println("Error: " + ex.getMessage()); 103 } 104 } 105 } 106 107 private String serializeSqoopCommand(String[] command) { 108 StringBuilder sb = new StringBuilder(); 109 for (String arg : command) { 110 sb.append(arg).append("\n"); 111 } 112 return sb.toString(); 113 } 114 115 static void setStrings(Properties conf, String key, String[] values) { 116 if (values != null) { 117 conf.setProperty(key + ".size", (new Integer(values.length)).toString()); 118 for (int i = 0; i < values.length; i++) { 119 conf.setProperty(key + "." + i, values[i]); 120 } 121 } 122 } 123 124 private void validateHttpSubmitConf(Properties conf) { 125 String JT = conf.getProperty(XOozieClient.JT); 126 String JT_2 = conf.getProperty(XOozieClient.JT_2); 127 if (JT == null) { 128 if(JT_2 == null) { 129 throw new RuntimeException("jobtracker is not specified in conf"); 130 } 131 } 132 133 String NN = conf.getProperty(XOozieClient.NN); 134 String NN_2 = conf.getProperty(XOozieClient.NN_2); 135 if (NN == null) { 136 if(NN_2 == null) { 137 throw new RuntimeException("namenode is not specified in conf"); 138 } else { 139 NN = NN_2; 140 } 141 } 142 143 String libPath = conf.getProperty(LIBPATH); 144 if (libPath == null) { 145 throw new RuntimeException("libpath is not specified in conf"); 146 } 147 if (!libPath.contains(":/")) { 148 String newLibPath; 149 if (libPath.startsWith("/")) { 150 if(NN.endsWith("/")) { 151 newLibPath = NN + libPath.substring(1); 152 } else { 153 newLibPath = NN + libPath; 154 } 155 } else { 156 throw new RuntimeException("libpath should be absolute"); 157 } 158 conf.setProperty(LIBPATH, newLibPath); 159 } 160 161 conf.setProperty(IS_PROXY_SUBMISSION, "true"); 162 } 163 164 /** 165 * Submit a Pig job via HTTP. 166 * 167 * @param conf job configuration. 168 * @param pigScriptFile pig script file. 169 * @param pigArgs pig arguments string. 170 * @return the job Id. 171 * @throws OozieClientException thrown if the job could not be submitted. 172 */ 173 @Deprecated 174 public String submitPig(Properties conf, String pigScriptFile, String[] pigArgs) throws IOException, OozieClientException { 175 return submitScriptLanguage(conf, pigScriptFile, pigArgs, OozieCLI.PIG_CMD); 176 } 177 178 /** 179 * Submit a Pig or Hive job via HTTP. 180 * 181 * @param conf job configuration. 182 * @param scriptFile script file. 183 * @param args arguments string. 184 * @return the job Id. 185 * @throws OozieClientException thrown if the job could not be submitted. 186 */ 187 public String submitScriptLanguage(Properties conf, String scriptFile, String[] args, String jobType) 188 throws IOException, OozieClientException { 189 return submitScriptLanguage(conf, scriptFile, args, null, jobType); 190 } 191 192 /** 193 * Submit a Pig or Hive job via HTTP. 194 * 195 * @param conf job configuration. 196 * @param scriptFile script file. 197 * @param args arguments string. 198 * @param params parameters string. 199 * @return the job Id. 200 * @throws OozieClientException thrown if the job could not be submitted. 201 */ 202 public String submitScriptLanguage(Properties conf, String scriptFile, String[] args, String[] params, String jobType) 203 throws IOException, OozieClientException { 204 if (conf == null) { 205 throw new IllegalArgumentException("conf cannot be null"); 206 } 207 if (scriptFile == null) { 208 throw new IllegalArgumentException("scriptFile cannot be null"); 209 } 210 211 validateHttpSubmitConf(conf); 212 213 String script = ""; 214 String options = ""; 215 String scriptParams = ""; 216 217 if (jobType.equals(OozieCLI.HIVE_CMD)) { 218 script = XOozieClient.HIVE_SCRIPT; 219 options = XOozieClient.HIVE_OPTIONS; 220 scriptParams = XOozieClient.HIVE_SCRIPT_PARAMS; 221 } 222 else if (jobType.equals(OozieCLI.PIG_CMD)) { 223 script = XOozieClient.PIG_SCRIPT; 224 options = XOozieClient.PIG_OPTIONS; 225 scriptParams = XOozieClient.PIG_SCRIPT_PARAMS; 226 } 227 else { 228 throw new IllegalArgumentException("jobType must be either pig or hive"); 229 } 230 231 conf.setProperty(script, readScript(scriptFile)); 232 setStrings(conf, options, args); 233 setStrings(conf, scriptParams, params); 234 235 return (new HttpJobSubmit(conf, jobType)).call(); 236 } 237 238 /** 239 * Submit a Sqoop job via HTTP. 240 * 241 * @param conf job configuration. 242 * @param command sqoop command to run. 243 * @param args arguments string. 244 * @return the job Id. 245 * @throws OozieClientException thrown if the job could not be submitted. 246 */ 247 public String submitSqoop(Properties conf, String[] command, String[] args) 248 throws OozieClientException { 249 if (conf == null) { 250 throw new IllegalArgumentException("conf cannot be null"); 251 } 252 if (command == null) { 253 throw new IllegalArgumentException("command cannot be null"); 254 } 255 256 validateHttpSubmitConf(conf); 257 258 conf.setProperty(XOozieClient.SQOOP_COMMAND, serializeSqoopCommand(command)); 259 setStrings(conf, XOozieClient.SQOOP_OPTIONS, args); 260 261 return (new HttpJobSubmit(conf, OozieCLI.SQOOP_CMD)).call(); 262 } 263 264 /** 265 * Submit a Map/Reduce job via HTTP. 266 * 267 * @param conf job configuration. 268 * @return the job Id. 269 * @throws OozieClientException thrown if the job could not be submitted. 270 */ 271 public String submitMapReduce(Properties conf) throws OozieClientException { 272 if (conf == null) { 273 throw new IllegalArgumentException("conf cannot be null"); 274 } 275 276 validateHttpSubmitConf(conf); 277 278 return (new HttpJobSubmit(conf, "mapreduce")).call(); 279 } 280 281 private class HttpJobSubmit extends ClientCallable<String> { 282 private Properties conf; 283 284 HttpJobSubmit(Properties conf, String jobType) { 285 super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, jobType)); 286 this.conf = notNull(conf, "conf"); 287 } 288 289 @Override 290 protected String call(HttpURLConnection conn) throws IOException, OozieClientException { 291 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 292 writeToXml(conf, conn.getOutputStream()); 293 if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) { 294 JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream())); 295 return (String) json.get(JsonTags.JOB_ID); 296 } 297 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { 298 handleError(conn); 299 } 300 return null; 301 } 302 } 303 304 /** 305 * set LIBPATH for HTTP submission job. 306 * 307 * @param conf Configuration object. 308 * @param pathStr lib HDFS path. 309 */ 310 public void setLib(Properties conf, String pathStr) { 311 conf.setProperty(LIBPATH, pathStr); 312 } 313 314 /** 315 * The equivalent to <file> tag in oozie's workflow xml. 316 * 317 * @param conf Configuration object. 318 * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name. 319 * For example, "/user/oozie/parameter_file#myparams". If no "#..." is specified, file name will be used as 320 * symbolic link name. 321 */ 322 public void addFile(Properties conf, String file) { 323 if (file == null || file.length() == 0) { 324 throw new IllegalArgumentException("file cannot be null or empty"); 325 } 326 String files = conf.getProperty(FILES); 327 conf.setProperty(FILES, files == null ? file : files + "," + file); 328 } 329 330 /** 331 * The equivalent to <archive> tag in oozie's workflow xml. 332 * 333 * @param conf Configuration object. 334 * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name. 335 * For example, "/user/oozie/udf1.jar#my.jar". If no "#..." is specified, file name will be used as 336 * symbolic link name. 337 */ 338 public void addArchive(Properties conf, String file) { 339 if (file == null || file.length() == 0) { 340 throw new IllegalArgumentException("file cannot be null or empty"); 341 } 342 String files = conf.getProperty(ARCHIVES); 343 conf.setProperty(ARCHIVES, files == null ? file : files + "," + file); 344 } 345}