001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.client; 019 020 import java.io.BufferedReader; 021 import java.io.File; 022 import java.io.FileReader; 023 import java.io.IOException; 024 import java.io.InputStreamReader; 025 import java.net.HttpURLConnection; 026 import java.util.Properties; 027 028 import org.apache.oozie.cli.OozieCLI; 029 import org.apache.oozie.client.rest.JsonTags; 030 import org.apache.oozie.client.rest.RestConstants; 031 import org.json.simple.JSONObject; 032 import org.json.simple.JSONValue; 033 034 public class XOozieClient extends OozieClient { 035 036 public static final String JT = "mapred.job.tracker"; 037 public static final String JT_2 = "mapreduce.jobtracker.address"; 038 039 public static final String NN = "fs.default.name"; 040 public static final String NN_2 = "fs.defaultFS"; 041 042 @Deprecated 043 public static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal"; 044 045 @Deprecated 046 public static final String NN_PRINCIPAL = "dfs.namenode.kerberos.principal"; 047 048 public static final String PIG_SCRIPT = "oozie.pig.script"; 049 050 public static final String PIG_OPTIONS = "oozie.pig.options"; 051 052 public static final String PIG_SCRIPT_PARAMS = "oozie.pig.script.params"; 053 054 public static final String HIVE_SCRIPT = "oozie.hive.script"; 055 056 public static final String HIVE_OPTIONS = "oozie.hive.options"; 057 058 public static final String HIVE_SCRIPT_PARAMS = "oozie.hive.script.params"; 059 060 public static final String FILES = "oozie.files"; 061 062 public static final String ARCHIVES = "oozie.archives"; 063 064 public static final String IS_PROXY_SUBMISSION = "oozie.proxysubmission"; 065 066 protected XOozieClient() { 067 } 068 069 /** 070 * Create an eXtended Workflow client instance. 071 * 072 * @param oozieUrl URL of the Oozie instance it will interact with. 073 */ 074 public XOozieClient(String oozieUrl) { 075 super(oozieUrl); 076 } 077 078 private String readScript(String script) throws IOException { 079 if (!new File(script).exists()) { 080 throw new IOException("Error: script file [" + script + "] does not exist"); 081 } 082 083 BufferedReader br = null; 084 try { 085 br = new BufferedReader(new FileReader(script)); 086 StringBuilder sb = new StringBuilder(); 087 String line; 088 while ((line = br.readLine()) != null) { 089 sb.append(line + "\n"); 090 } 091 return sb.toString(); 092 } 093 finally { 094 try { 095 br.close(); 096 } 097 catch (IOException ex) { 098 System.err.println("Error: " + ex.getMessage()); 099 } 100 } 101 } 102 103 static void setStrings(Properties conf, String key, String[] values) { 104 if (values != null) { 105 conf.setProperty(key + ".size", (new Integer(values.length)).toString()); 106 for (int i = 0; i < values.length; i++) { 107 conf.setProperty(key + "." + i, values[i]); 108 } 109 } 110 } 111 112 private void validateHttpSubmitConf(Properties conf) { 113 String JT = conf.getProperty(XOozieClient.JT); 114 String JT_2 = conf.getProperty(XOozieClient.JT_2); 115 if (JT == null) { 116 if(JT_2 == null) { 117 throw new RuntimeException("jobtracker is not specified in conf"); 118 } 119 } 120 121 String NN = conf.getProperty(XOozieClient.NN); 122 String NN_2 = conf.getProperty(XOozieClient.NN_2); 123 if (NN == null) { 124 if(NN_2 == null) { 125 throw new RuntimeException("namenode is not specified in conf"); 126 } else { 127 NN = NN_2; 128 } 129 } 130 131 String libPath = conf.getProperty(LIBPATH); 132 if (libPath == null) { 133 throw new RuntimeException("libpath is not specified in conf"); 134 } 135 if (!libPath.contains(":/")) { 136 String newLibPath; 137 if (libPath.startsWith("/")) { 138 if(NN.endsWith("/")) { 139 newLibPath = NN + libPath.substring(1); 140 } else { 141 newLibPath = NN + libPath; 142 } 143 } else { 144 throw new RuntimeException("libpath should be absolute"); 145 } 146 conf.setProperty(LIBPATH, newLibPath); 147 } 148 149 conf.setProperty(IS_PROXY_SUBMISSION, "true"); 150 } 151 152 /** 153 * Submit a Pig job via HTTP. 154 * 155 * @param conf job configuration. 156 * @param pigScriptFile pig script file. 157 * @param pigArgs pig arguments string. 158 * @return the job Id. 159 * @throws OozieClientException thrown if the job could not be submitted. 160 */ 161 @Deprecated 162 public String submitPig(Properties conf, String pigScriptFile, String[] pigArgs) throws IOException, OozieClientException { 163 return submitScriptLanguage(conf, pigScriptFile, pigArgs, OozieCLI.PIG_CMD); 164 } 165 166 /** 167 * Submit a Pig or Hive job via HTTP. 168 * 169 * @param conf job configuration. 170 * @param scriptFile script file. 171 * @param args arguments string. 172 * @return the job Id. 173 * @throws OozieClientException thrown if the job could not be submitted. 174 */ 175 public String submitScriptLanguage(Properties conf, String scriptFile, String[] args, String jobType) 176 throws IOException, OozieClientException { 177 return submitScriptLanguage(conf, scriptFile, args, null, jobType); 178 } 179 180 /** 181 * Submit a Pig or Hive job via HTTP. 182 * 183 * @param conf job configuration. 184 * @param scriptFile script file. 185 * @param args arguments string. 186 * @param params parameters string. 187 * @return the job Id. 188 * @throws OozieClientException thrown if the job could not be submitted. 189 */ 190 public String submitScriptLanguage(Properties conf, String scriptFile, String[] args, String[] params, String jobType) 191 throws IOException, OozieClientException { 192 if (conf == null) { 193 throw new IllegalArgumentException("conf cannot be null"); 194 } 195 if (scriptFile == null) { 196 throw new IllegalArgumentException("scriptFile cannot be null"); 197 } 198 199 validateHttpSubmitConf(conf); 200 201 String script = ""; 202 String options = ""; 203 String scriptParams = ""; 204 205 if (jobType.equals(OozieCLI.HIVE_CMD)) { 206 script = XOozieClient.HIVE_SCRIPT; 207 options = XOozieClient.HIVE_OPTIONS; 208 scriptParams = XOozieClient.HIVE_SCRIPT_PARAMS; 209 } 210 else if (jobType.equals(OozieCLI.PIG_CMD)) { 211 script = XOozieClient.PIG_SCRIPT; 212 options = XOozieClient.PIG_OPTIONS; 213 scriptParams = XOozieClient.PIG_SCRIPT_PARAMS; 214 } 215 else { 216 throw new IllegalArgumentException("jobType must be either pig or hive"); 217 } 218 219 conf.setProperty(script, readScript(scriptFile)); 220 setStrings(conf, options, args); 221 setStrings(conf, scriptParams, params); 222 223 return (new HttpJobSubmit(conf, jobType)).call(); 224 } 225 226 /** 227 * Submit a Map/Reduce job via HTTP. 228 * 229 * @param conf job configuration. 230 * @return the job Id. 231 * @throws OozieClientException thrown if the job could not be submitted. 232 */ 233 public String submitMapReduce(Properties conf) throws OozieClientException { 234 if (conf == null) { 235 throw new IllegalArgumentException("conf cannot be null"); 236 } 237 238 validateHttpSubmitConf(conf); 239 240 return (new HttpJobSubmit(conf, "mapreduce")).call(); 241 } 242 243 private class HttpJobSubmit extends ClientCallable<String> { 244 private Properties conf; 245 246 HttpJobSubmit(Properties conf, String jobType) { 247 super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, jobType)); 248 this.conf = notNull(conf, "conf"); 249 } 250 251 @Override 252 protected String call(HttpURLConnection conn) throws IOException, OozieClientException { 253 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 254 writeToXml(conf, conn.getOutputStream()); 255 if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) { 256 JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream())); 257 return (String) json.get(JsonTags.JOB_ID); 258 } 259 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { 260 handleError(conn); 261 } 262 return null; 263 } 264 } 265 266 /** 267 * set LIBPATH for HTTP submission job. 268 * 269 * @param conf Configuration object. 270 * @param pathStr lib HDFS path. 271 */ 272 public void setLib(Properties conf, String pathStr) { 273 conf.setProperty(LIBPATH, pathStr); 274 } 275 276 /** 277 * The equivalent to <file> tag in oozie's workflow xml. 278 * 279 * @param conf Configuration object. 280 * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name. 281 * For example, "/user/oozie/parameter_file#myparams". If no "#..." is specified, file name will be used as 282 * symbolic link name. 283 */ 284 public void addFile(Properties conf, String file) { 285 if (file == null || file.length() == 0) { 286 throw new IllegalArgumentException("file cannot be null or empty"); 287 } 288 String files = conf.getProperty(FILES); 289 conf.setProperty(FILES, files == null ? file : files + "," + file); 290 } 291 292 /** 293 * The equivalent to <archive> tag in oozie's workflow xml. 294 * 295 * @param conf Configuration object. 296 * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name. 297 * For example, "/user/oozie/udf1.jar#my.jar". If no "#..." is specified, file name will be used as 298 * symbolic link name. 299 */ 300 public void addArchive(Properties conf, String file) { 301 if (file == null || file.length() == 0) { 302 throw new IllegalArgumentException("file cannot be null or empty"); 303 } 304 String files = conf.getProperty(ARCHIVES); 305 conf.setProperty(ARCHIVES, files == null ? file : files + "," + file); 306 } 307 }