001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.client;
020
021import java.io.BufferedReader;
022import java.io.File;
023import java.io.FileReader;
024import java.io.IOException;
025import java.io.InputStreamReader;
026import java.net.HttpURLConnection;
027import java.util.Properties;
028
029import org.apache.oozie.cli.OozieCLI;
030import org.apache.oozie.client.rest.JsonTags;
031import org.apache.oozie.client.rest.RestConstants;
032import org.json.simple.JSONObject;
033import org.json.simple.JSONValue;
034
035public class XOozieClient extends OozieClient {
036
037    public static final String JT = "mapred.job.tracker";
038    public static final String JT_2 = "mapreduce.jobtracker.address";
039
040    public static final String NN = "fs.default.name";
041    public static final String NN_2 = "fs.defaultFS";
042
043    @Deprecated
044    public static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
045
046    @Deprecated
047    public static final String NN_PRINCIPAL = "dfs.namenode.kerberos.principal";
048
049    public static final String PIG_SCRIPT = "oozie.pig.script";
050
051    public static final String PIG_OPTIONS = "oozie.pig.options";
052
053    public static final String PIG_SCRIPT_PARAMS = "oozie.pig.script.params";
054
055    public static final String HIVE_SCRIPT = "oozie.hive.script";
056
057    public static final String HIVE_OPTIONS = "oozie.hive.options";
058
059    public static final String HIVE_SCRIPT_PARAMS = "oozie.hive.script.params";
060
061    public static final String SQOOP_COMMAND = "oozie.sqoop.command";
062
063    public static final String SQOOP_OPTIONS = "oozie.sqoop.options";
064
065    public static final String FILES = "oozie.files";
066
067    public static final String ARCHIVES = "oozie.archives";
068
069    public static final String IS_PROXY_SUBMISSION = "oozie.proxysubmission";
070
071    protected XOozieClient() {
072    }
073
074    /**
075     * Create an eXtended Workflow client instance.
076     *
077     * @param oozieUrl URL of the Oozie instance it will interact with.
078     */
079    public XOozieClient(String oozieUrl) {
080        super(oozieUrl);
081    }
082
083    private String readScript(String script) throws IOException {
084        if (!new File(script).exists()) {
085            throw new IOException("Error: script file [" + script + "] does not exist");
086        }
087
088        BufferedReader br = null;
089        try {
090            br = new BufferedReader(new FileReader(script));
091            StringBuilder sb = new StringBuilder();
092            String line;
093            while ((line = br.readLine()) != null) {
094                sb.append(line + "\n");
095            }
096            return sb.toString();
097        }
098        finally {
099            try {
100                br.close();
101            }
102            catch (IOException ex) {
103                System.err.println("Error: " + ex.getMessage());
104            }
105        }
106    }
107
108    private String serializeSqoopCommand(String[] command) {
109        StringBuilder sb = new StringBuilder();
110        for (String arg : command) {
111            sb.append(arg).append("\n");
112        }
113        return sb.toString();
114    }
115
116    static void setStrings(Properties conf, String key, String[] values) {
117        if (values != null) {
118            conf.setProperty(key + ".size", (new Integer(values.length)).toString());
119            for (int i = 0; i < values.length; i++) {
120                conf.setProperty(key + "." + i, values[i]);
121            }
122        }
123    }
124
125    private void validateHttpSubmitConf(Properties conf) {
126        String JT = conf.getProperty(XOozieClient.JT);
127        String JT_2 = conf.getProperty(XOozieClient.JT_2);
128        if (JT == null) {
129            if(JT_2 == null) {
130                throw new RuntimeException("jobtracker is not specified in conf");
131            }
132        }
133
134        String NN = conf.getProperty(XOozieClient.NN);
135        String NN_2 = conf.getProperty(XOozieClient.NN_2);
136        if (NN == null) {
137            if(NN_2 == null) {
138                throw new RuntimeException("namenode is not specified in conf");
139            } else {
140                NN = NN_2;
141            }
142        }
143
144        String libPath = conf.getProperty(LIBPATH);
145        if (libPath == null) {
146            throw new RuntimeException("libpath is not specified in conf");
147        }
148        if (!libPath.contains(":/")) {
149            String newLibPath;
150            if (libPath.startsWith("/")) {
151                if(NN.endsWith("/")) {
152                    newLibPath = NN + libPath.substring(1);
153                } else {
154                    newLibPath = NN + libPath;
155                }
156            } else {
157                throw new RuntimeException("libpath should be absolute");
158            }
159            conf.setProperty(LIBPATH, newLibPath);
160        }
161
162        conf.setProperty(IS_PROXY_SUBMISSION, "true");
163    }
164
165    /**
166     * Submit a Pig job via HTTP.
167     *
168     * @param conf job configuration.
169     * @param pigScriptFile pig script file.
170     * @param pigArgs pig arguments string.
171     * @return the job Id.
172     * @throws OozieClientException thrown if the job could not be submitted.
173     */
174    @Deprecated
175    public String submitPig(Properties conf, String pigScriptFile, String[] pigArgs) throws IOException, OozieClientException {
176        return submitScriptLanguage(conf, pigScriptFile, pigArgs, OozieCLI.PIG_CMD);
177    }
178
179    /**
180     * Submit a Pig or Hive job via HTTP.
181     *
182     * @param conf job configuration.
183     * @param scriptFile  script file.
184     * @param args  arguments string.
185     * @return the job Id.
186     * @throws OozieClientException thrown if the job could not be submitted.
187     */
188    public String submitScriptLanguage(Properties conf, String scriptFile, String[] args, String jobType)
189            throws IOException, OozieClientException {
190        return submitScriptLanguage(conf, scriptFile, args, null, jobType);
191    }
192
193    /**
194     * Submit a Pig or Hive job via HTTP.
195     *
196     * @param conf job configuration.
197     * @param scriptFile  script file.
198     * @param args  arguments string.
199     * @param params parameters string.
200     * @return the job Id.
201     * @throws OozieClientException thrown if the job could not be submitted.
202     */
203    public String submitScriptLanguage(Properties conf, String scriptFile, String[] args, String[] params, String jobType)
204            throws IOException, OozieClientException {
205        OozieClient.notNull(conf, "conf");
206        OozieClient.notNull(scriptFile, "scriptFile");
207        validateHttpSubmitConf(conf);
208
209        String script = "";
210        String options = "";
211        String scriptParams = "";
212
213        if (jobType.equals(OozieCLI.HIVE_CMD)) {
214            script = XOozieClient.HIVE_SCRIPT;
215            options = XOozieClient.HIVE_OPTIONS;
216            scriptParams = XOozieClient.HIVE_SCRIPT_PARAMS;
217        }
218        else if (jobType.equals(OozieCLI.PIG_CMD)) {
219            script =  XOozieClient.PIG_SCRIPT;
220            options = XOozieClient.PIG_OPTIONS;
221            scriptParams = XOozieClient.PIG_SCRIPT_PARAMS;
222        }
223        else {
224            throw new IllegalArgumentException("jobType must be either pig or hive");
225        }
226
227        conf.setProperty(script, readScript(scriptFile));
228        setStrings(conf, options, args);
229        setStrings(conf, scriptParams, params);
230
231        return (new HttpJobSubmit(conf, jobType)).call();
232    }
233
234    /**
235     * Submit a Sqoop job via HTTP.
236     *
237     * @param conf job configuration.
238     * @param command sqoop command to run.
239     * @param args  arguments string.
240     * @return the job Id.
241     * @throws OozieClientException thrown if the job could not be submitted.
242     */
243    public String submitSqoop(Properties conf, String[] command, String[] args)
244            throws OozieClientException {
245        OozieClient.notNull(conf, "conf");
246        OozieClient.notNull(command, "command");
247        validateHttpSubmitConf(conf);
248
249        conf.setProperty(XOozieClient.SQOOP_COMMAND, serializeSqoopCommand(command));
250        setStrings(conf, XOozieClient.SQOOP_OPTIONS, args);
251
252        return (new HttpJobSubmit(conf, OozieCLI.SQOOP_CMD)).call();
253    }
254
255    /**
256     * Submit a Map/Reduce job via HTTP.
257     *
258     * @param conf job configuration.
259     * @return the job Id.
260     * @throws OozieClientException thrown if the job could not be submitted.
261     */
262    public String submitMapReduce(Properties conf) throws OozieClientException {
263        OozieClient.notNull(conf, "conf");
264        validateHttpSubmitConf(conf);
265
266        return (new HttpJobSubmit(conf, "mapreduce")).call();
267    }
268
269    private class HttpJobSubmit extends ClientCallable<String> {
270        private Properties conf;
271
272        HttpJobSubmit(Properties conf, String jobType) {
273            super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, jobType));
274            this.conf = notNull(conf, "conf");
275        }
276
277        @Override
278        protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
279            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
280            writeToXml(conf, conn.getOutputStream());
281            if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) {
282                JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
283                return (String) json.get(JsonTags.JOB_ID);
284            }
285            if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
286                handleError(conn);
287            }
288            return null;
289        }
290    }
291
292    /**
293     * set LIBPATH for HTTP submission job.
294     *
295     * @param conf Configuration object.
296     * @param pathStr lib HDFS path.
297     */
298    public void setLib(Properties conf, String pathStr) {
299        conf.setProperty(LIBPATH, pathStr);
300    }
301
302    /**
303     * The equivalent to &lt;file&gt; tag in oozie's workflow xml.
304     *
305     * @param conf Configuration object.
306     * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name.
307     *             For example, "/user/oozie/parameter_file#myparams". If no "#..." is specified, file name will be used as
308     *             symbolic link name.
309     */
310    public void addFile(Properties conf, String file) {
311        OozieClient.notEmpty(file, "file");
312        String files = conf.getProperty(FILES);
313        conf.setProperty(FILES, files == null ? file : files + "," + file);
314    }
315
316    /**
317     * The equivalent to &lt;archive&gt; tag in oozie's workflow xml.
318     *
319     * @param conf Configuration object.
320     * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name.
321     *             For example, "/user/oozie/udf1.jar#my.jar". If no "#..." is specified, file name will be used as
322     *             symbolic link name.
323     */
324    public void addArchive(Properties conf, String file) {
325        OozieClient.notEmpty(file, "file");
326        String files = conf.getProperty(ARCHIVES);
327        conf.setProperty(ARCHIVES, files == null ? file : files + "," + file);
328    }
329}