001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.client;
019
020import java.io.BufferedReader;
021import java.io.File;
022import java.io.FileReader;
023import java.io.IOException;
024import java.io.InputStreamReader;
025import java.net.HttpURLConnection;
026import java.util.Properties;
027
028import org.apache.oozie.cli.OozieCLI;
029import org.apache.oozie.client.rest.JsonTags;
030import org.apache.oozie.client.rest.RestConstants;
031import org.json.simple.JSONObject;
032import org.json.simple.JSONValue;
033
034public class XOozieClient extends OozieClient {
035
036    public static final String JT = "mapred.job.tracker";
037    public static final String JT_2 = "mapreduce.jobtracker.address";
038
039    public static final String NN = "fs.default.name";
040    public static final String NN_2 = "fs.defaultFS";
041
042    @Deprecated
043    public static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
044
045    @Deprecated
046    public static final String NN_PRINCIPAL = "dfs.namenode.kerberos.principal";
047
048    public static final String PIG_SCRIPT = "oozie.pig.script";
049
050    public static final String PIG_OPTIONS = "oozie.pig.options";
051
052    public static final String PIG_SCRIPT_PARAMS = "oozie.pig.script.params";
053
054    public static final String HIVE_SCRIPT = "oozie.hive.script";
055
056    public static final String HIVE_OPTIONS = "oozie.hive.options";
057
058    public static final String HIVE_SCRIPT_PARAMS = "oozie.hive.script.params";
059
060    public static final String SQOOP_COMMAND = "oozie.sqoop.command";
061
062    public static final String SQOOP_OPTIONS = "oozie.sqoop.options";
063
064    public static final String FILES = "oozie.files";
065
066    public static final String ARCHIVES = "oozie.archives";
067
068    public static final String IS_PROXY_SUBMISSION = "oozie.proxysubmission";
069
070    protected XOozieClient() {
071    }
072
073    /**
074     * Create an eXtended Workflow client instance.
075     *
076     * @param oozieUrl URL of the Oozie instance it will interact with.
077     */
078    public XOozieClient(String oozieUrl) {
079        super(oozieUrl);
080    }
081
082    private String readScript(String script) throws IOException {
083        if (!new File(script).exists()) {
084            throw new IOException("Error: script file [" + script + "] does not exist");
085        }
086
087        BufferedReader br = null;
088        try {
089            br = new BufferedReader(new FileReader(script));
090            StringBuilder sb = new StringBuilder();
091            String line;
092            while ((line = br.readLine()) != null) {
093                sb.append(line + "\n");
094            }
095            return sb.toString();
096        }
097        finally {
098            try {
099                br.close();
100            }
101            catch (IOException ex) {
102                System.err.println("Error: " + ex.getMessage());
103            }
104        }
105    }
106
107    private String serializeSqoopCommand(String[] command) {
108        StringBuilder sb = new StringBuilder();
109        for (String arg : command) {
110            sb.append(arg).append("\n");
111        }
112        return sb.toString();
113    }
114
115    static void setStrings(Properties conf, String key, String[] values) {
116        if (values != null) {
117            conf.setProperty(key + ".size", (new Integer(values.length)).toString());
118            for (int i = 0; i < values.length; i++) {
119                conf.setProperty(key + "." + i, values[i]);
120            }
121        }
122    }
123
124    private void validateHttpSubmitConf(Properties conf) {
125        String JT = conf.getProperty(XOozieClient.JT);
126        String JT_2 = conf.getProperty(XOozieClient.JT_2);
127        if (JT == null) {
128            if(JT_2 == null) {
129                throw new RuntimeException("jobtracker is not specified in conf");
130            }
131        }
132
133        String NN = conf.getProperty(XOozieClient.NN);
134        String NN_2 = conf.getProperty(XOozieClient.NN_2);
135        if (NN == null) {
136            if(NN_2 == null) {
137                throw new RuntimeException("namenode is not specified in conf");
138            } else {
139                NN = NN_2;
140            }
141        }
142
143        String libPath = conf.getProperty(LIBPATH);
144        if (libPath == null) {
145            throw new RuntimeException("libpath is not specified in conf");
146        }
147        if (!libPath.contains(":/")) {
148            String newLibPath;
149            if (libPath.startsWith("/")) {
150                if(NN.endsWith("/")) {
151                    newLibPath = NN + libPath.substring(1);
152                } else {
153                    newLibPath = NN + libPath;
154                }
155            } else {
156                throw new RuntimeException("libpath should be absolute");
157            }
158            conf.setProperty(LIBPATH, newLibPath);
159        }
160
161        conf.setProperty(IS_PROXY_SUBMISSION, "true");
162    }
163
164    /**
165     * Submit a Pig job via HTTP.
166     *
167     * @param conf job configuration.
168     * @param pigScriptFile pig script file.
169     * @param pigArgs pig arguments string.
170     * @return the job Id.
171     * @throws OozieClientException thrown if the job could not be submitted.
172     */
173    @Deprecated
174    public String submitPig(Properties conf, String pigScriptFile, String[] pigArgs) throws IOException, OozieClientException {
175        return submitScriptLanguage(conf, pigScriptFile, pigArgs, OozieCLI.PIG_CMD);
176    }
177
178    /**
179     * Submit a Pig or Hive job via HTTP.
180     *
181     * @param conf job configuration.
182     * @param scriptFile  script file.
183     * @param args  arguments string.
184     * @return the job Id.
185     * @throws OozieClientException thrown if the job could not be submitted.
186     */
187    public String submitScriptLanguage(Properties conf, String scriptFile, String[] args, String jobType)
188            throws IOException, OozieClientException {
189        return submitScriptLanguage(conf, scriptFile, args, null, jobType);
190    }
191
192    /**
193     * Submit a Pig or Hive job via HTTP.
194     *
195     * @param conf job configuration.
196     * @param scriptFile  script file.
197     * @param args  arguments string.
198     * @param params parameters string.
199     * @return the job Id.
200     * @throws OozieClientException thrown if the job could not be submitted.
201     */
202    public String submitScriptLanguage(Properties conf, String scriptFile, String[] args, String[] params, String jobType)
203            throws IOException, OozieClientException {
204        if (conf == null) {
205            throw new IllegalArgumentException("conf cannot be null");
206        }
207        if (scriptFile == null) {
208            throw new IllegalArgumentException("scriptFile cannot be null");
209        }
210
211        validateHttpSubmitConf(conf);
212
213        String script = "";
214        String options = "";
215        String scriptParams = "";
216
217        if (jobType.equals(OozieCLI.HIVE_CMD)) {
218            script = XOozieClient.HIVE_SCRIPT;
219            options = XOozieClient.HIVE_OPTIONS;
220            scriptParams = XOozieClient.HIVE_SCRIPT_PARAMS;
221        }
222        else if (jobType.equals(OozieCLI.PIG_CMD)) {
223            script =  XOozieClient.PIG_SCRIPT;
224            options = XOozieClient.PIG_OPTIONS;
225            scriptParams = XOozieClient.PIG_SCRIPT_PARAMS;
226        }
227        else {
228            throw new IllegalArgumentException("jobType must be either pig or hive");
229        }
230
231        conf.setProperty(script, readScript(scriptFile));
232        setStrings(conf, options, args);
233        setStrings(conf, scriptParams, params);
234
235        return (new HttpJobSubmit(conf, jobType)).call();
236    }
237
238    /**
239     * Submit a Sqoop job via HTTP.
240     *
241     * @param conf job configuration.
242     * @param command sqoop command to run.
243     * @param args  arguments string.
244     * @return the job Id.
245     * @throws OozieClientException thrown if the job could not be submitted.
246     */
247    public String submitSqoop(Properties conf, String[] command, String[] args)
248            throws OozieClientException {
249        if (conf == null) {
250            throw new IllegalArgumentException("conf cannot be null");
251        }
252        if (command == null) {
253            throw new IllegalArgumentException("command cannot be null");
254        }
255
256        validateHttpSubmitConf(conf);
257
258        conf.setProperty(XOozieClient.SQOOP_COMMAND, serializeSqoopCommand(command));
259        setStrings(conf, XOozieClient.SQOOP_OPTIONS, args);
260
261        return (new HttpJobSubmit(conf, OozieCLI.SQOOP_CMD)).call();
262    }
263
264    /**
265     * Submit a Map/Reduce job via HTTP.
266     *
267     * @param conf job configuration.
268     * @return the job Id.
269     * @throws OozieClientException thrown if the job could not be submitted.
270     */
271    public String submitMapReduce(Properties conf) throws OozieClientException {
272        if (conf == null) {
273            throw new IllegalArgumentException("conf cannot be null");
274        }
275
276        validateHttpSubmitConf(conf);
277
278        return (new HttpJobSubmit(conf, "mapreduce")).call();
279    }
280
281    private class HttpJobSubmit extends ClientCallable<String> {
282        private Properties conf;
283
284        HttpJobSubmit(Properties conf, String jobType) {
285            super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, jobType));
286            this.conf = notNull(conf, "conf");
287        }
288
289        @Override
290        protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
291            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
292            writeToXml(conf, conn.getOutputStream());
293            if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) {
294                JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
295                return (String) json.get(JsonTags.JOB_ID);
296            }
297            if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
298                handleError(conn);
299            }
300            return null;
301        }
302    }
303
304    /**
305     * set LIBPATH for HTTP submission job.
306     *
307     * @param conf Configuration object.
308     * @param pathStr lib HDFS path.
309     */
310    public void setLib(Properties conf, String pathStr) {
311        conf.setProperty(LIBPATH, pathStr);
312    }
313
314    /**
315     * The equivalent to <file> tag in oozie's workflow xml.
316     *
317     * @param conf Configuration object.
318     * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name.
319     *             For example, "/user/oozie/parameter_file#myparams". If no "#..." is specified, file name will be used as
320     *             symbolic link name.
321     */
322    public void addFile(Properties conf, String file) {
323        if (file == null || file.length() == 0) {
324            throw new IllegalArgumentException("file cannot be null or empty");
325        }
326        String files = conf.getProperty(FILES);
327        conf.setProperty(FILES, files == null ? file : files + "," + file);
328    }
329
330    /**
331     * The equivalent to <archive> tag in oozie's workflow xml.
332     *
333     * @param conf Configuration object.
334     * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name.
335     *             For example, "/user/oozie/udf1.jar#my.jar". If no "#..." is specified, file name will be used as
336     *             symbolic link name.
337     */
338    public void addArchive(Properties conf, String file) {
339        if (file == null || file.length() == 0) {
340            throw new IllegalArgumentException("file cannot be null or empty");
341        }
342        String files = conf.getProperty(ARCHIVES);
343        conf.setProperty(ARCHIVES, files == null ? file : files + "," + file);
344    }
345}