001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.client;
019    
020    import java.io.BufferedReader;
021    import java.io.File;
022    import java.io.FileReader;
023    import java.io.IOException;
024    import java.io.InputStreamReader;
025    import java.net.HttpURLConnection;
026    import java.util.Properties;
027    
028    import org.apache.oozie.cli.OozieCLI;
029    import org.apache.oozie.client.rest.JsonTags;
030    import org.apache.oozie.client.rest.RestConstants;
031    import org.json.simple.JSONObject;
032    import org.json.simple.JSONValue;
033    
034    public class XOozieClient extends OozieClient {
035    
036        public static final String JT = "mapred.job.tracker";
037        public static final String JT_2 = "mapreduce.jobtracker.address";
038    
039        public static final String NN = "fs.default.name";
040        public static final String NN_2 = "fs.defaultFS";
041    
042        @Deprecated
043        public static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
044    
045        @Deprecated
046        public static final String NN_PRINCIPAL = "dfs.namenode.kerberos.principal";
047    
048        public static final String PIG_SCRIPT = "oozie.pig.script";
049    
050        public static final String PIG_OPTIONS = "oozie.pig.options";
051    
052        public static final String PIG_SCRIPT_PARAMS = "oozie.pig.script.params";
053    
054        public static final String HIVE_SCRIPT = "oozie.hive.script";
055    
056        public static final String HIVE_OPTIONS = "oozie.hive.options";
057    
058        public static final String HIVE_SCRIPT_PARAMS = "oozie.hive.script.params";
059    
060        public static final String FILES = "oozie.files";
061    
062        public static final String ARCHIVES = "oozie.archives";
063    
064        public static final String IS_PROXY_SUBMISSION = "oozie.proxysubmission";
065    
066        protected XOozieClient() {
067        }
068    
069        /**
070         * Create an eXtended Workflow client instance.
071         *
072         * @param oozieUrl URL of the Oozie instance it will interact with.
073         */
074        public XOozieClient(String oozieUrl) {
075            super(oozieUrl);
076        }
077    
078        private String readScript(String script) throws IOException {
079            if (!new File(script).exists()) {
080                throw new IOException("Error: script file [" + script + "] does not exist");
081            }
082    
083            BufferedReader br = null;
084            try {
085                br = new BufferedReader(new FileReader(script));
086                StringBuilder sb = new StringBuilder();
087                String line;
088                while ((line = br.readLine()) != null) {
089                    sb.append(line + "\n");
090                }
091                return sb.toString();
092            }
093            finally {
094                try {
095                    br.close();
096                }
097                catch (IOException ex) {
098                    System.err.println("Error: " + ex.getMessage());
099                }
100            }
101        }
102    
103        static void setStrings(Properties conf, String key, String[] values) {
104            if (values != null) {
105                conf.setProperty(key + ".size", (new Integer(values.length)).toString());
106                for (int i = 0; i < values.length; i++) {
107                    conf.setProperty(key + "." + i, values[i]);
108                }
109            }
110        }
111    
112        private void validateHttpSubmitConf(Properties conf) {
113            String JT = conf.getProperty(XOozieClient.JT);
114            String JT_2 = conf.getProperty(XOozieClient.JT_2);
115            if (JT == null) {
116                if(JT_2 == null) {
117                    throw new RuntimeException("jobtracker is not specified in conf");
118                }
119            }
120    
121            String NN = conf.getProperty(XOozieClient.NN);
122            String NN_2 = conf.getProperty(XOozieClient.NN_2);
123            if (NN == null) {
124                if(NN_2 == null) {
125                    throw new RuntimeException("namenode is not specified in conf");
126                } else {
127                    NN = NN_2;
128                }
129            }
130    
131            String libPath = conf.getProperty(LIBPATH);
132            if (libPath == null) {
133                throw new RuntimeException("libpath is not specified in conf");
134            }
135            if (!libPath.contains(":/")) {
136                String newLibPath;
137                if (libPath.startsWith("/")) {
138                    if(NN.endsWith("/")) {
139                        newLibPath = NN + libPath.substring(1);
140                    } else {
141                        newLibPath = NN + libPath;
142                    }
143                } else {
144                    throw new RuntimeException("libpath should be absolute");
145                }
146                conf.setProperty(LIBPATH, newLibPath);
147            }
148    
149            conf.setProperty(IS_PROXY_SUBMISSION, "true");
150        }
151    
152        /**
153         * Submit a Pig job via HTTP.
154         *
155         * @param conf job configuration.
156         * @param pigScriptFile pig script file.
157         * @param pigArgs pig arguments string.
158         * @return the job Id.
159         * @throws OozieClientException thrown if the job could not be submitted.
160         */
161        @Deprecated
162        public String submitPig(Properties conf, String pigScriptFile, String[] pigArgs) throws IOException, OozieClientException {
163            return submitScriptLanguage(conf, pigScriptFile, pigArgs, OozieCLI.PIG_CMD);
164        }
165    
166        /**
167         * Submit a Pig or Hive job via HTTP.
168         *
169         * @param conf job configuration.
170         * @param scriptFile  script file.
171         * @param args  arguments string.
172         * @return the job Id.
173         * @throws OozieClientException thrown if the job could not be submitted.
174         */
175        public String submitScriptLanguage(Properties conf, String scriptFile, String[] args, String jobType)
176                throws IOException, OozieClientException {
177            return submitScriptLanguage(conf, scriptFile, args, null, jobType);
178        }
179    
180        /**
181         * Submit a Pig or Hive job via HTTP.
182         *
183         * @param conf job configuration.
184         * @param scriptFile  script file.
185         * @param args  arguments string.
186         * @param params parameters string.
187         * @return the job Id.
188         * @throws OozieClientException thrown if the job could not be submitted.
189         */
190        public String submitScriptLanguage(Properties conf, String scriptFile, String[] args, String[] params, String jobType)
191                throws IOException, OozieClientException {
192            if (conf == null) {
193                throw new IllegalArgumentException("conf cannot be null");
194            }
195            if (scriptFile == null) {
196                throw new IllegalArgumentException("scriptFile cannot be null");
197            }
198    
199            validateHttpSubmitConf(conf);
200    
201            String script = "";
202            String options = "";
203            String scriptParams = "";
204    
205            if (jobType.equals(OozieCLI.HIVE_CMD)) {
206                script = XOozieClient.HIVE_SCRIPT;
207                options = XOozieClient.HIVE_OPTIONS;
208                scriptParams = XOozieClient.HIVE_SCRIPT_PARAMS;
209            }
210            else if (jobType.equals(OozieCLI.PIG_CMD)) {
211                script =  XOozieClient.PIG_SCRIPT;
212                options = XOozieClient.PIG_OPTIONS;
213                scriptParams = XOozieClient.PIG_SCRIPT_PARAMS;
214            }
215            else {
216                throw new IllegalArgumentException("jobType must be either pig or hive");
217            }
218    
219            conf.setProperty(script, readScript(scriptFile));
220            setStrings(conf, options, args);
221            setStrings(conf, scriptParams, params);
222    
223            return (new HttpJobSubmit(conf, jobType)).call();
224        }
225    
226        /**
227         * Submit a Map/Reduce job via HTTP.
228         *
229         * @param conf job configuration.
230         * @return the job Id.
231         * @throws OozieClientException thrown if the job could not be submitted.
232         */
233        public String submitMapReduce(Properties conf) throws OozieClientException {
234            if (conf == null) {
235                throw new IllegalArgumentException("conf cannot be null");
236            }
237    
238            validateHttpSubmitConf(conf);
239    
240            return (new HttpJobSubmit(conf, "mapreduce")).call();
241        }
242    
243        private class HttpJobSubmit extends ClientCallable<String> {
244            private Properties conf;
245    
246            HttpJobSubmit(Properties conf, String jobType) {
247                super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, jobType));
248                this.conf = notNull(conf, "conf");
249            }
250    
251            @Override
252            protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
253                conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
254                writeToXml(conf, conn.getOutputStream());
255                if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) {
256                    JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
257                    return (String) json.get(JsonTags.JOB_ID);
258                }
259                if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
260                    handleError(conn);
261                }
262                return null;
263            }
264        }
265    
266        /**
267         * set LIBPATH for HTTP submission job.
268         *
269         * @param conf Configuration object.
270         * @param pathStr lib HDFS path.
271         */
272        public void setLib(Properties conf, String pathStr) {
273            conf.setProperty(LIBPATH, pathStr);
274        }
275    
276        /**
277         * The equivalent to <file> tag in oozie's workflow xml.
278         *
279         * @param conf Configuration object.
280         * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name.
281         *             For example, "/user/oozie/parameter_file#myparams". If no "#..." is specified, file name will be used as
282         *             symbolic link name.
283         */
284        public void addFile(Properties conf, String file) {
285            if (file == null || file.length() == 0) {
286                throw new IllegalArgumentException("file cannot be null or empty");
287            }
288            String files = conf.getProperty(FILES);
289            conf.setProperty(FILES, files == null ? file : files + "," + file);
290        }
291    
292        /**
293         * The equivalent to <archive> tag in oozie's workflow xml.
294         *
295         * @param conf Configuration object.
296         * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name.
297         *             For example, "/user/oozie/udf1.jar#my.jar". If no "#..." is specified, file name will be used as
298         *             symbolic link name.
299         */
300        public void addArchive(Properties conf, String file) {
301            if (file == null || file.length() == 0) {
302                throw new IllegalArgumentException("file cannot be null or empty");
303            }
304            String files = conf.getProperty(ARCHIVES);
305            conf.setProperty(ARCHIVES, files == null ? file : files + "," + file);
306        }
307    }