This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.client;
019
020 import java.io.BufferedReader;
021 import java.io.File;
022 import java.io.FileReader;
023 import java.io.IOException;
024 import java.io.InputStreamReader;
025 import java.net.HttpURLConnection;
026 import java.util.Properties;
027
028 import org.apache.oozie.client.rest.JsonTags;
029 import org.apache.oozie.client.rest.RestConstants;
030 import org.json.simple.JSONObject;
031 import org.json.simple.JSONValue;
032
033 public class XOozieClient extends OozieClient {
034
035 public static final String JT = "mapred.job.tracker";
036 public static final String JT_2 = "mapreduce.jobtracker.address";
037
038 public static final String NN = "fs.default.name";
039 public static final String NN_2 = "fs.defaultFS";
040
041 @Deprecated
042 public static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
043
044 @Deprecated
045 public static final String NN_PRINCIPAL = "dfs.namenode.kerberos.principal";
046
047 public static final String PIG_SCRIPT = "oozie.pig.script";
048
049 public static final String PIG_OPTIONS = "oozie.pig.options";
050
051 public static final String FILES = "oozie.files";
052
053 public static final String ARCHIVES = "oozie.archives";
054
055 public static final String IS_PROXY_SUBMISSION = "oozie.proxysubmission";
056
057 protected XOozieClient() {
058 }
059
060 /**
061 * Create an eXtended Workflow client instance.
062 *
063 * @param oozieUrl URL of the Oozie instance it will interact with.
064 */
065 public XOozieClient(String oozieUrl) {
066 super(oozieUrl);
067 }
068
069 private String readPigScript(String script) throws IOException {
070 if (!new File(script).exists()) {
071 throw new IOException("Error: Pig script file [" + script + "] does not exist");
072 }
073
074 BufferedReader br = null;
075 try {
076 br = new BufferedReader(new FileReader(script));
077 StringBuilder sb = new StringBuilder();
078 String line;
079 while ((line = br.readLine()) != null) {
080 sb.append(line + "\n");
081 }
082 return sb.toString();
083 }
084 finally {
085 try {
086 br.close();
087 }
088 catch (IOException ex) {
089 System.err.println("Error: " + ex.getMessage());
090 }
091 }
092 }
093
094 static void setStrings(Properties conf, String key, String[] values) {
095 if (values != null) {
096 conf.setProperty(key + ".size", (new Integer(values.length)).toString());
097 for (int i = 0; i < values.length; i++) {
098 conf.setProperty(key + "." + i, values[i]);
099 }
100 }
101 }
102
103 private void validateHttpSubmitConf(Properties conf) {
104 String JT = conf.getProperty(XOozieClient.JT);
105 String JT_2 = conf.getProperty(XOozieClient.JT_2);
106 if (JT == null) {
107 if(JT_2 == null) {
108 throw new RuntimeException("jobtracker is not specified in conf");
109 }
110 }
111
112 String NN = conf.getProperty(XOozieClient.NN);
113 String NN_2 = conf.getProperty(XOozieClient.NN_2);
114 if (NN == null) {
115 if(NN_2 == null) {
116 throw new RuntimeException("namenode is not specified in conf");
117 }
118 }
119
120 String libPath = conf.getProperty(LIBPATH);
121 if (libPath == null) {
122 throw new RuntimeException("libpath is not specified in conf");
123 }
124 if (!libPath.contains(":/")) {
125 String newLibPath;
126 if (libPath.startsWith("/")) {
127 if(NN.endsWith("/")) {
128 newLibPath = NN + libPath.substring(1);
129 } else {
130 newLibPath = NN + libPath;
131 }
132 } else {
133 throw new RuntimeException("libpath should be absolute");
134 }
135 conf.setProperty(LIBPATH, newLibPath);
136 }
137
138 conf.setProperty(IS_PROXY_SUBMISSION, "true");
139 }
140
141 /**
142 * Submit a Pig job via HTTP.
143 *
144 * @param conf job configuration.
145 * @param pigScriptFile pig script file.
146 * @param pigArgs pig arguments string.
147 * @return the job Id.
148 * @throws OozieClientException thrown if the job could not be submitted.
149 */
150 public String submitPig(Properties conf, String pigScriptFile, String[] pigArgs) throws IOException, OozieClientException {
151 if (conf == null) {
152 throw new IllegalArgumentException("conf cannot be null");
153 }
154 if (pigScriptFile == null) {
155 throw new IllegalArgumentException("pigScriptFile cannot be null");
156 }
157
158 validateHttpSubmitConf(conf);
159
160 conf.setProperty(XOozieClient.PIG_SCRIPT, readPigScript(pigScriptFile));
161 setStrings(conf, XOozieClient.PIG_OPTIONS, pigArgs);
162
163 return (new HttpJobSubmit(conf, "pig")).call();
164 }
165
166 /**
167 * Submit a Map/Reduce job via HTTP.
168 *
169 * @param conf job configuration.
170 * @return the job Id.
171 * @throws OozieClientException thrown if the job could not be submitted.
172 */
173 public String submitMapReduce(Properties conf) throws OozieClientException {
174 if (conf == null) {
175 throw new IllegalArgumentException("conf cannot be null");
176 }
177
178 validateHttpSubmitConf(conf);
179
180 return (new HttpJobSubmit(conf, "mapreduce")).call();
181 }
182
183 private class HttpJobSubmit extends ClientCallable<String> {
184 private Properties conf;
185
186 HttpJobSubmit(Properties conf, String jobType) {
187 super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, jobType));
188 this.conf = notNull(conf, "conf");
189 }
190
191 @Override
192 protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
193 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
194 writeToXml(conf, conn.getOutputStream());
195 if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) {
196 JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
197 return (String) json.get(JsonTags.JOB_ID);
198 }
199 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
200 handleError(conn);
201 }
202 return null;
203 }
204 }
205
206 /**
207 * set LIBPATH for HTTP submission job.
208 *
209 * @param conf Configuration object.
210 * @param pathStr lib HDFS path.
211 */
212 public void setLib(Properties conf, String pathStr) {
213 conf.setProperty(LIBPATH, pathStr);
214 }
215
216 /**
217 * The equivalent to <file> tag in oozie's workflow xml.
218 *
219 * @param conf Configuration object.
220 * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name.
221 * For example, "/user/oozie/parameter_file#myparams". If no "#..." is specified, file name will be used as
222 * symbolic link name.
223 */
224 public void addFile(Properties conf, String file) {
225 if (file == null || file.length() == 0) {
226 throw new IllegalArgumentException("file cannot be null or empty");
227 }
228 String files = conf.getProperty(FILES);
229 conf.setProperty(FILES, files == null ? file : files + "," + file);
230 }
231
232 /**
233 * The equivalent to <archive> tag in oozie's workflow xml.
234 *
235 * @param conf Configuration object.
236 * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name.
237 * For example, "/user/oozie/udf1.jar#my.jar". If no "#..." is specified, file name will be used as
238 * symbolic link name.
239 */
240 public void addArchive(Properties conf, String file) {
241 if (file == null || file.length() == 0) {
242 throw new IllegalArgumentException("file cannot be null or empty");
243 }
244 String files = conf.getProperty(ARCHIVES);
245 conf.setProperty(ARCHIVES, files == null ? file : files + "," + file);
246 }
247 }