This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.client;
019
020 import java.io.BufferedReader;
021 import java.io.File;
022 import java.io.FileReader;
023 import java.io.IOException;
024 import java.io.InputStreamReader;
025 import java.net.HttpURLConnection;
026 import java.util.Properties;
027
028 import org.apache.oozie.cli.OozieCLI;
029 import org.apache.oozie.client.rest.JsonTags;
030 import org.apache.oozie.client.rest.RestConstants;
031 import org.json.simple.JSONObject;
032 import org.json.simple.JSONValue;
033
034 public class XOozieClient extends OozieClient {
035
036 public static final String JT = "mapred.job.tracker";
037 public static final String JT_2 = "mapreduce.jobtracker.address";
038
039 public static final String NN = "fs.default.name";
040 public static final String NN_2 = "fs.defaultFS";
041
042 @Deprecated
043 public static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
044
045 @Deprecated
046 public static final String NN_PRINCIPAL = "dfs.namenode.kerberos.principal";
047
048 public static final String PIG_SCRIPT = "oozie.pig.script";
049
050 public static final String PIG_OPTIONS = "oozie.pig.options";
051
052 public static final String PIG_SCRIPT_PARAMS = "oozie.pig.script.params";
053
054 public static final String HIVE_SCRIPT = "oozie.hive.script";
055
056 public static final String HIVE_OPTIONS = "oozie.hive.options";
057
058 public static final String HIVE_SCRIPT_PARAMS = "oozie.hive.script.params";
059
060 public static final String FILES = "oozie.files";
061
062 public static final String ARCHIVES = "oozie.archives";
063
064 public static final String IS_PROXY_SUBMISSION = "oozie.proxysubmission";
065
066 protected XOozieClient() {
067 }
068
069 /**
070 * Create an eXtended Workflow client instance.
071 *
072 * @param oozieUrl URL of the Oozie instance it will interact with.
073 */
074 public XOozieClient(String oozieUrl) {
075 super(oozieUrl);
076 }
077
078 private String readScript(String script) throws IOException {
079 if (!new File(script).exists()) {
080 throw new IOException("Error: script file [" + script + "] does not exist");
081 }
082
083 BufferedReader br = null;
084 try {
085 br = new BufferedReader(new FileReader(script));
086 StringBuilder sb = new StringBuilder();
087 String line;
088 while ((line = br.readLine()) != null) {
089 sb.append(line + "\n");
090 }
091 return sb.toString();
092 }
093 finally {
094 try {
095 br.close();
096 }
097 catch (IOException ex) {
098 System.err.println("Error: " + ex.getMessage());
099 }
100 }
101 }
102
103 static void setStrings(Properties conf, String key, String[] values) {
104 if (values != null) {
105 conf.setProperty(key + ".size", (new Integer(values.length)).toString());
106 for (int i = 0; i < values.length; i++) {
107 conf.setProperty(key + "." + i, values[i]);
108 }
109 }
110 }
111
112 private void validateHttpSubmitConf(Properties conf) {
113 String JT = conf.getProperty(XOozieClient.JT);
114 String JT_2 = conf.getProperty(XOozieClient.JT_2);
115 if (JT == null) {
116 if(JT_2 == null) {
117 throw new RuntimeException("jobtracker is not specified in conf");
118 }
119 }
120
121 String NN = conf.getProperty(XOozieClient.NN);
122 String NN_2 = conf.getProperty(XOozieClient.NN_2);
123 if (NN == null) {
124 if(NN_2 == null) {
125 throw new RuntimeException("namenode is not specified in conf");
126 } else {
127 NN = NN_2;
128 }
129 }
130
131 String libPath = conf.getProperty(LIBPATH);
132 if (libPath == null) {
133 throw new RuntimeException("libpath is not specified in conf");
134 }
135 if (!libPath.contains(":/")) {
136 String newLibPath;
137 if (libPath.startsWith("/")) {
138 if(NN.endsWith("/")) {
139 newLibPath = NN + libPath.substring(1);
140 } else {
141 newLibPath = NN + libPath;
142 }
143 } else {
144 throw new RuntimeException("libpath should be absolute");
145 }
146 conf.setProperty(LIBPATH, newLibPath);
147 }
148
149 conf.setProperty(IS_PROXY_SUBMISSION, "true");
150 }
151
152 /**
153 * Submit a Pig job via HTTP.
154 *
155 * @param conf job configuration.
156 * @param pigScriptFile pig script file.
157 * @param pigArgs pig arguments string.
158 * @return the job Id.
159 * @throws OozieClientException thrown if the job could not be submitted.
160 */
161 @Deprecated
162 public String submitPig(Properties conf, String pigScriptFile, String[] pigArgs) throws IOException, OozieClientException {
163 return submitScriptLanguage(conf, pigScriptFile, pigArgs, OozieCLI.PIG_CMD);
164 }
165
166 /**
167 * Submit a Pig or Hive job via HTTP.
168 *
169 * @param conf job configuration.
170 * @param scriptFile script file.
171 * @param args arguments string.
172 * @return the job Id.
173 * @throws OozieClientException thrown if the job could not be submitted.
174 */
175 public String submitScriptLanguage(Properties conf, String scriptFile, String[] args, String jobType)
176 throws IOException, OozieClientException {
177 return submitScriptLanguage(conf, scriptFile, args, null, jobType);
178 }
179
180 /**
181 * Submit a Pig or Hive job via HTTP.
182 *
183 * @param conf job configuration.
184 * @param scriptFile script file.
185 * @param args arguments string.
186 * @param params parameters string.
187 * @return the job Id.
188 * @throws OozieClientException thrown if the job could not be submitted.
189 */
190 public String submitScriptLanguage(Properties conf, String scriptFile, String[] args, String[] params, String jobType)
191 throws IOException, OozieClientException {
192 if (conf == null) {
193 throw new IllegalArgumentException("conf cannot be null");
194 }
195 if (scriptFile == null) {
196 throw new IllegalArgumentException("scriptFile cannot be null");
197 }
198
199 validateHttpSubmitConf(conf);
200
201 String script = "";
202 String options = "";
203 String scriptParams = "";
204
205 if (jobType.equals(OozieCLI.HIVE_CMD)) {
206 script = XOozieClient.HIVE_SCRIPT;
207 options = XOozieClient.HIVE_OPTIONS;
208 scriptParams = XOozieClient.HIVE_SCRIPT_PARAMS;
209 }
210 else if (jobType.equals(OozieCLI.PIG_CMD)) {
211 script = XOozieClient.PIG_SCRIPT;
212 options = XOozieClient.PIG_OPTIONS;
213 scriptParams = XOozieClient.PIG_SCRIPT_PARAMS;
214 }
215 else {
216 throw new IllegalArgumentException("jobType must be either pig or hive");
217 }
218
219 conf.setProperty(script, readScript(scriptFile));
220 setStrings(conf, options, args);
221 setStrings(conf, scriptParams, params);
222
223 return (new HttpJobSubmit(conf, jobType)).call();
224 }
225
226 /**
227 * Submit a Map/Reduce job via HTTP.
228 *
229 * @param conf job configuration.
230 * @return the job Id.
231 * @throws OozieClientException thrown if the job could not be submitted.
232 */
233 public String submitMapReduce(Properties conf) throws OozieClientException {
234 if (conf == null) {
235 throw new IllegalArgumentException("conf cannot be null");
236 }
237
238 validateHttpSubmitConf(conf);
239
240 return (new HttpJobSubmit(conf, "mapreduce")).call();
241 }
242
243 private class HttpJobSubmit extends ClientCallable<String> {
244 private Properties conf;
245
246 HttpJobSubmit(Properties conf, String jobType) {
247 super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, jobType));
248 this.conf = notNull(conf, "conf");
249 }
250
251 @Override
252 protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
253 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
254 writeToXml(conf, conn.getOutputStream());
255 if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) {
256 JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
257 return (String) json.get(JsonTags.JOB_ID);
258 }
259 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
260 handleError(conn);
261 }
262 return null;
263 }
264 }
265
266 /**
267 * set LIBPATH for HTTP submission job.
268 *
269 * @param conf Configuration object.
270 * @param pathStr lib HDFS path.
271 */
272 public void setLib(Properties conf, String pathStr) {
273 conf.setProperty(LIBPATH, pathStr);
274 }
275
276 /**
277 * The equivalent to <file> tag in oozie's workflow xml.
278 *
279 * @param conf Configuration object.
280 * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name.
281 * For example, "/user/oozie/parameter_file#myparams". If no "#..." is specified, file name will be used as
282 * symbolic link name.
283 */
284 public void addFile(Properties conf, String file) {
285 if (file == null || file.length() == 0) {
286 throw new IllegalArgumentException("file cannot be null or empty");
287 }
288 String files = conf.getProperty(FILES);
289 conf.setProperty(FILES, files == null ? file : files + "," + file);
290 }
291
292 /**
293 * The equivalent to <archive> tag in oozie's workflow xml.
294 *
295 * @param conf Configuration object.
296 * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name.
297 * For example, "/user/oozie/udf1.jar#my.jar". If no "#..." is specified, file name will be used as
298 * symbolic link name.
299 */
300 public void addArchive(Properties conf, String file) {
301 if (file == null || file.length() == 0) {
302 throw new IllegalArgumentException("file cannot be null or empty");
303 }
304 String files = conf.getProperty(ARCHIVES);
305 conf.setProperty(ARCHIVES, files == null ? file : files + "," + file);
306 }
307 }