001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.client; 019 020 import java.io.BufferedReader; 021 import java.io.File; 022 import java.io.FileReader; 023 import java.io.IOException; 024 import java.io.InputStreamReader; 025 import java.net.HttpURLConnection; 026 import java.util.Properties; 027 028 import org.apache.oozie.client.rest.JsonTags; 029 import org.apache.oozie.client.rest.RestConstants; 030 import org.json.simple.JSONObject; 031 import org.json.simple.JSONValue; 032 033 public class XOozieClient extends OozieClient { 034 035 public static final String JT = "mapred.job.tracker"; 036 037 public static final String NN = "fs.default.name"; 038 039 public static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal"; 040 041 public static final String NN_PRINCIPAL = "dfs.namenode.kerberos.principal"; 042 043 public static final String PIG_SCRIPT = "oozie.pig.script"; 044 045 public static final String PIG_OPTIONS = "oozie.pig.options"; 046 047 public static final String FILES = "oozie.files"; 048 049 public static final String ARCHIVES = "oozie.archives"; 050 051 public static final String IS_PROXY_SUBMISSION = "oozie.proxysubmission"; 052 053 protected XOozieClient() { 054 } 055 056 /** 057 * Create an eXtended Workflow client instance. 058 * 059 * @param oozieUrl URL of the Oozie instance it will interact with. 060 */ 061 public XOozieClient(String oozieUrl) { 062 super(oozieUrl); 063 } 064 065 private String readPigScript(String script) throws IOException { 066 if (!new File(script).exists()) { 067 throw new IOException("Error: Pig script file [" + script + "] does not exist"); 068 } 069 070 BufferedReader br = null; 071 try { 072 br = new BufferedReader(new FileReader(script)); 073 StringBuilder sb = new StringBuilder(); 074 String line; 075 while ((line = br.readLine()) != null) { 076 sb.append(line + "\n"); 077 } 078 return sb.toString(); 079 } 080 finally { 081 try { 082 br.close(); 083 } 084 catch (IOException ex) { 085 System.err.println("Error: " + ex.getMessage()); 086 } 087 } 088 } 089 090 static void setStrings(Properties conf, String key, String[] values) { 091 if (values != null) { 092 conf.setProperty(key + ".size", (new Integer(values.length)).toString()); 093 for (int i = 0; i < values.length; i++) { 094 conf.setProperty(key + "." + i, values[i]); 095 } 096 } 097 } 098 099 private void validateHttpSubmitConf(Properties conf) { 100 String JT = conf.getProperty(XOozieClient.JT); 101 if (JT == null) { 102 throw new RuntimeException("jobtracker is not specified in conf"); 103 } 104 105 String NN = conf.getProperty(XOozieClient.NN); 106 if (NN == null) { 107 throw new RuntimeException("namenode is not specified in conf"); 108 } 109 110 String libPath = conf.getProperty(LIBPATH); 111 if (libPath == null) { 112 throw new RuntimeException("libpath is not specified in conf"); 113 } 114 if (!libPath.startsWith("hdfs://")) { 115 String newLibPath = NN + libPath; 116 conf.setProperty(LIBPATH, newLibPath); 117 } 118 119 conf.setProperty(IS_PROXY_SUBMISSION, "true"); 120 } 121 122 /** 123 * Submit a Pig job via HTTP. 124 * 125 * @param conf job configuration. 126 * @param pigScriptFile pig script file. 127 * @param pigArgs pig arguments string. 128 * @return the job Id. 129 * @throws OozieClientException thrown if the job could not be submitted. 130 */ 131 public String submitPig(Properties conf, String pigScriptFile, String[] pigArgs) throws IOException, OozieClientException { 132 if (conf == null) { 133 throw new IllegalArgumentException("conf cannot be null"); 134 } 135 if (pigScriptFile == null) { 136 throw new IllegalArgumentException("pigScriptFile cannot be null"); 137 } 138 139 validateHttpSubmitConf(conf); 140 141 conf.setProperty(XOozieClient.PIG_SCRIPT, readPigScript(pigScriptFile)); 142 setStrings(conf, XOozieClient.PIG_OPTIONS, pigArgs); 143 144 return (new HttpJobSubmit(conf, "pig")).call(); 145 } 146 147 /** 148 * Submit a Map/Reduce job via HTTP. 149 * 150 * @param conf job configuration. 151 * @return the job Id. 152 * @throws OozieClientException thrown if the job could not be submitted. 153 */ 154 public String submitMapReduce(Properties conf) throws OozieClientException { 155 if (conf == null) { 156 throw new IllegalArgumentException("conf cannot be null"); 157 } 158 159 validateHttpSubmitConf(conf); 160 161 return (new HttpJobSubmit(conf, "mapreduce")).call(); 162 } 163 164 private class HttpJobSubmit extends ClientCallable<String> { 165 private Properties conf; 166 167 HttpJobSubmit(Properties conf, String jobType) { 168 super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, jobType)); 169 this.conf = notNull(conf, "conf"); 170 } 171 172 @Override 173 protected String call(HttpURLConnection conn) throws IOException, OozieClientException { 174 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 175 writeToXml(conf, conn.getOutputStream()); 176 if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) { 177 JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream())); 178 return (String) json.get(JsonTags.JOB_ID); 179 } 180 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { 181 handleError(conn); 182 } 183 return null; 184 } 185 } 186 187 /** 188 * set LIBPATH for HTTP submission job. 189 * 190 * @param conf Configuration object. 191 * @param path lib HDFS path. 192 */ 193 public void setLib(Properties conf, String pathStr) { 194 conf.setProperty(LIBPATH, pathStr); 195 } 196 197 /** 198 * The equivalent to <file> tag in oozie's workflow xml. 199 * 200 * @param conf Configuration object. 201 * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name. 202 * For example, "/user/oozie/parameter_file#myparams". If no "#..." is specified, file name will be used as 203 * symbolic link name. 204 */ 205 public void addFile(Properties conf, String file) { 206 if (file == null || file.length() == 0) { 207 throw new IllegalArgumentException("file cannot be null or empty"); 208 } 209 String files = conf.getProperty(FILES); 210 conf.setProperty(FILES, files == null ? file : files + "," + file); 211 } 212 213 /** 214 * The equivalent to <archive> tag in oozie's workflow xml. 215 * 216 * @param conf Configuration object. 217 * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name. 218 * For example, "/user/oozie/udf1.jar#my.jar". If no "#..." is specified, file name will be used as 219 * symbolic link name. 220 */ 221 public void addArchive(Properties conf, String file) { 222 if (file == null || file.length() == 0) { 223 throw new IllegalArgumentException("file cannot be null or empty"); 224 } 225 String files = conf.getProperty(ARCHIVES); 226 conf.setProperty(ARCHIVES, files == null ? file : files + "," + file); 227 } 228 }