001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.client; 019 020 import java.io.BufferedReader; 021 import java.io.File; 022 import java.io.FileReader; 023 import java.io.IOException; 024 import java.io.InputStreamReader; 025 import java.net.HttpURLConnection; 026 import java.util.Properties; 027 028 import org.apache.oozie.client.rest.JsonTags; 029 import org.apache.oozie.client.rest.RestConstants; 030 import org.json.simple.JSONObject; 031 import org.json.simple.JSONValue; 032 033 public class XOozieClient extends OozieClient { 034 035 public static final String JT = "mapred.job.tracker"; 036 public static final String JT_2 = "mapreduce.jobtracker.address"; 037 038 public static final String NN = "fs.default.name"; 039 public static final String NN_2 = "fs.defaultFS"; 040 041 @Deprecated 042 public static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal"; 043 044 @Deprecated 045 public static final String NN_PRINCIPAL = "dfs.namenode.kerberos.principal"; 046 047 public static final String PIG_SCRIPT = "oozie.pig.script"; 048 049 public static final String PIG_OPTIONS = "oozie.pig.options"; 050 051 public static final String FILES = "oozie.files"; 052 053 public static final String ARCHIVES = "oozie.archives"; 054 055 public static final String IS_PROXY_SUBMISSION = "oozie.proxysubmission"; 056 057 protected XOozieClient() { 058 } 059 060 /** 061 * Create an eXtended Workflow client instance. 062 * 063 * @param oozieUrl URL of the Oozie instance it will interact with. 064 */ 065 public XOozieClient(String oozieUrl) { 066 super(oozieUrl); 067 } 068 069 private String readPigScript(String script) throws IOException { 070 if (!new File(script).exists()) { 071 throw new IOException("Error: Pig script file [" + script + "] does not exist"); 072 } 073 074 BufferedReader br = null; 075 try { 076 br = new BufferedReader(new FileReader(script)); 077 StringBuilder sb = new StringBuilder(); 078 String line; 079 while ((line = br.readLine()) != null) { 080 sb.append(line + "\n"); 081 } 082 return sb.toString(); 083 } 084 finally { 085 try { 086 br.close(); 087 } 088 catch (IOException ex) { 089 System.err.println("Error: " + ex.getMessage()); 090 } 091 } 092 } 093 094 static void setStrings(Properties conf, String key, String[] values) { 095 if (values != null) { 096 conf.setProperty(key + ".size", (new Integer(values.length)).toString()); 097 for (int i = 0; i < values.length; i++) { 098 conf.setProperty(key + "." + i, values[i]); 099 } 100 } 101 } 102 103 private void validateHttpSubmitConf(Properties conf) { 104 String JT = conf.getProperty(XOozieClient.JT); 105 String JT_2 = conf.getProperty(XOozieClient.JT_2); 106 if (JT == null) { 107 if(JT_2 == null) { 108 throw new RuntimeException("jobtracker is not specified in conf"); 109 } 110 } 111 112 String NN = conf.getProperty(XOozieClient.NN); 113 String NN_2 = conf.getProperty(XOozieClient.NN_2); 114 if (NN == null) { 115 if(NN_2 == null) { 116 throw new RuntimeException("namenode is not specified in conf"); 117 } 118 } 119 120 String libPath = conf.getProperty(LIBPATH); 121 if (libPath == null) { 122 throw new RuntimeException("libpath is not specified in conf"); 123 } 124 if (!libPath.contains(":/")) { 125 String newLibPath; 126 if (libPath.startsWith("/")) { 127 if(NN.endsWith("/")) { 128 newLibPath = NN + libPath.substring(1); 129 } else { 130 newLibPath = NN + libPath; 131 } 132 } else { 133 throw new RuntimeException("libpath should be absolute"); 134 } 135 conf.setProperty(LIBPATH, newLibPath); 136 } 137 138 conf.setProperty(IS_PROXY_SUBMISSION, "true"); 139 } 140 141 /** 142 * Submit a Pig job via HTTP. 143 * 144 * @param conf job configuration. 145 * @param pigScriptFile pig script file. 146 * @param pigArgs pig arguments string. 147 * @return the job Id. 148 * @throws OozieClientException thrown if the job could not be submitted. 149 */ 150 public String submitPig(Properties conf, String pigScriptFile, String[] pigArgs) throws IOException, OozieClientException { 151 if (conf == null) { 152 throw new IllegalArgumentException("conf cannot be null"); 153 } 154 if (pigScriptFile == null) { 155 throw new IllegalArgumentException("pigScriptFile cannot be null"); 156 } 157 158 validateHttpSubmitConf(conf); 159 160 conf.setProperty(XOozieClient.PIG_SCRIPT, readPigScript(pigScriptFile)); 161 setStrings(conf, XOozieClient.PIG_OPTIONS, pigArgs); 162 163 return (new HttpJobSubmit(conf, "pig")).call(); 164 } 165 166 /** 167 * Submit a Map/Reduce job via HTTP. 168 * 169 * @param conf job configuration. 170 * @return the job Id. 171 * @throws OozieClientException thrown if the job could not be submitted. 172 */ 173 public String submitMapReduce(Properties conf) throws OozieClientException { 174 if (conf == null) { 175 throw new IllegalArgumentException("conf cannot be null"); 176 } 177 178 validateHttpSubmitConf(conf); 179 180 return (new HttpJobSubmit(conf, "mapreduce")).call(); 181 } 182 183 private class HttpJobSubmit extends ClientCallable<String> { 184 private Properties conf; 185 186 HttpJobSubmit(Properties conf, String jobType) { 187 super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, jobType)); 188 this.conf = notNull(conf, "conf"); 189 } 190 191 @Override 192 protected String call(HttpURLConnection conn) throws IOException, OozieClientException { 193 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 194 writeToXml(conf, conn.getOutputStream()); 195 if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) { 196 JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream())); 197 return (String) json.get(JsonTags.JOB_ID); 198 } 199 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { 200 handleError(conn); 201 } 202 return null; 203 } 204 } 205 206 /** 207 * set LIBPATH for HTTP submission job. 208 * 209 * @param conf Configuration object. 210 * @param pathStr lib HDFS path. 211 */ 212 public void setLib(Properties conf, String pathStr) { 213 conf.setProperty(LIBPATH, pathStr); 214 } 215 216 /** 217 * The equivalent to <file> tag in oozie's workflow xml. 218 * 219 * @param conf Configuration object. 220 * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name. 221 * For example, "/user/oozie/parameter_file#myparams". If no "#..." is specified, file name will be used as 222 * symbolic link name. 223 */ 224 public void addFile(Properties conf, String file) { 225 if (file == null || file.length() == 0) { 226 throw new IllegalArgumentException("file cannot be null or empty"); 227 } 228 String files = conf.getProperty(FILES); 229 conf.setProperty(FILES, files == null ? file : files + "," + file); 230 } 231 232 /** 233 * The equivalent to <archive> tag in oozie's workflow xml. 234 * 235 * @param conf Configuration object. 236 * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name. 237 * For example, "/user/oozie/udf1.jar#my.jar". If no "#..." is specified, file name will be used as 238 * symbolic link name. 239 */ 240 public void addArchive(Properties conf, String file) { 241 if (file == null || file.length() == 0) { 242 throw new IllegalArgumentException("file cannot be null or empty"); 243 } 244 String files = conf.getProperty(ARCHIVES); 245 conf.setProperty(ARCHIVES, files == null ? file : files + "," + file); 246 } 247 }