This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.client;
019
020 import java.io.BufferedReader;
021 import java.io.File;
022 import java.io.FileReader;
023 import java.io.IOException;
024 import java.io.InputStreamReader;
025 import java.net.HttpURLConnection;
026 import java.util.Properties;
027
028 import org.apache.oozie.client.rest.JsonTags;
029 import org.apache.oozie.client.rest.RestConstants;
030 import org.json.simple.JSONObject;
031 import org.json.simple.JSONValue;
032
033 public class XOozieClient extends OozieClient {
034
035 public static final String JT = "mapred.job.tracker";
036
037 public static final String NN = "fs.default.name";
038
039 public static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
040
041 public static final String NN_PRINCIPAL = "dfs.namenode.kerberos.principal";
042
043 public static final String PIG_SCRIPT = "oozie.pig.script";
044
045 public static final String PIG_OPTIONS = "oozie.pig.options";
046
047 public static final String FILES = "oozie.files";
048
049 public static final String ARCHIVES = "oozie.archives";
050
051 public static final String IS_PROXY_SUBMISSION = "oozie.proxysubmission";
052
053 protected XOozieClient() {
054 }
055
056 /**
057 * Create an eXtended Workflow client instance.
058 *
059 * @param oozieUrl URL of the Oozie instance it will interact with.
060 */
061 public XOozieClient(String oozieUrl) {
062 super(oozieUrl);
063 }
064
065 private String readPigScript(String script) throws IOException {
066 if (!new File(script).exists()) {
067 throw new IOException("Error: Pig script file [" + script + "] does not exist");
068 }
069
070 BufferedReader br = null;
071 try {
072 br = new BufferedReader(new FileReader(script));
073 StringBuilder sb = new StringBuilder();
074 String line;
075 while ((line = br.readLine()) != null) {
076 sb.append(line + "\n");
077 }
078 return sb.toString();
079 }
080 finally {
081 try {
082 br.close();
083 }
084 catch (IOException ex) {
085 System.err.println("Error: " + ex.getMessage());
086 }
087 }
088 }
089
090 static void setStrings(Properties conf, String key, String[] values) {
091 if (values != null) {
092 conf.setProperty(key + ".size", (new Integer(values.length)).toString());
093 for (int i = 0; i < values.length; i++) {
094 conf.setProperty(key + "." + i, values[i]);
095 }
096 }
097 }
098
099 private void validateHttpSubmitConf(Properties conf) {
100 String JT = conf.getProperty(XOozieClient.JT);
101 if (JT == null) {
102 throw new RuntimeException("jobtracker is not specified in conf");
103 }
104
105 String NN = conf.getProperty(XOozieClient.NN);
106 if (NN == null) {
107 throw new RuntimeException("namenode is not specified in conf");
108 }
109
110 String libPath = conf.getProperty(LIBPATH);
111 if (libPath == null) {
112 throw new RuntimeException("libpath is not specified in conf");
113 }
114 if (!libPath.startsWith("hdfs://")) {
115 String newLibPath = NN + libPath;
116 conf.setProperty(LIBPATH, newLibPath);
117 }
118
119 conf.setProperty(IS_PROXY_SUBMISSION, "true");
120 }
121
122 /**
123 * Submit a Pig job via HTTP.
124 *
125 * @param conf job configuration.
126 * @param pigScriptFile pig script file.
127 * @param pigArgs pig arguments string.
128 * @return the job Id.
129 * @throws OozieClientException thrown if the job could not be submitted.
130 */
131 public String submitPig(Properties conf, String pigScriptFile, String[] pigArgs) throws IOException, OozieClientException {
132 if (conf == null) {
133 throw new IllegalArgumentException("conf cannot be null");
134 }
135 if (pigScriptFile == null) {
136 throw new IllegalArgumentException("pigScriptFile cannot be null");
137 }
138
139 validateHttpSubmitConf(conf);
140
141 conf.setProperty(XOozieClient.PIG_SCRIPT, readPigScript(pigScriptFile));
142 setStrings(conf, XOozieClient.PIG_OPTIONS, pigArgs);
143
144 return (new HttpJobSubmit(conf, "pig")).call();
145 }
146
147 /**
148 * Submit a Map/Reduce job via HTTP.
149 *
150 * @param conf job configuration.
151 * @return the job Id.
152 * @throws OozieClientException thrown if the job could not be submitted.
153 */
154 public String submitMapReduce(Properties conf) throws OozieClientException {
155 if (conf == null) {
156 throw new IllegalArgumentException("conf cannot be null");
157 }
158
159 validateHttpSubmitConf(conf);
160
161 return (new HttpJobSubmit(conf, "mapreduce")).call();
162 }
163
164 private class HttpJobSubmit extends ClientCallable<String> {
165 private Properties conf;
166
167 HttpJobSubmit(Properties conf, String jobType) {
168 super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, jobType));
169 this.conf = notNull(conf, "conf");
170 }
171
172 @Override
173 protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
174 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
175 writeToXml(conf, conn.getOutputStream());
176 if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) {
177 JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
178 return (String) json.get(JsonTags.JOB_ID);
179 }
180 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
181 handleError(conn);
182 }
183 return null;
184 }
185 }
186
187 /**
188 * set LIBPATH for HTTP submission job.
189 *
190 * @param conf Configuration object.
191 * @param pathStr lib HDFS path.
192 */
193 public void setLib(Properties conf, String pathStr) {
194 conf.setProperty(LIBPATH, pathStr);
195 }
196
197 /**
198 * The equivalent to <file> tag in oozie's workflow xml.
199 *
200 * @param conf Configuration object.
201 * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name.
202 * For example, "/user/oozie/parameter_file#myparams". If no "#..." is specified, file name will be used as
203 * symbolic link name.
204 */
205 public void addFile(Properties conf, String file) {
206 if (file == null || file.length() == 0) {
207 throw new IllegalArgumentException("file cannot be null or empty");
208 }
209 String files = conf.getProperty(FILES);
210 conf.setProperty(FILES, files == null ? file : files + "," + file);
211 }
212
213 /**
214 * The equivalent to <archive> tag in oozie's workflow xml.
215 *
216 * @param conf Configuration object.
217 * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name.
218 * For example, "/user/oozie/udf1.jar#my.jar". If no "#..." is specified, file name will be used as
219 * symbolic link name.
220 */
221 public void addArchive(Properties conf, String file) {
222 if (file == null || file.length() == 0) {
223 throw new IllegalArgumentException("file cannot be null or empty");
224 }
225 String files = conf.getProperty(ARCHIVES);
226 conf.setProperty(ARCHIVES, files == null ? file : files + "," + file);
227 }
228 }