001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.client;
019    
020    import java.io.BufferedReader;
021    import java.io.File;
022    import java.io.FileReader;
023    import java.io.IOException;
024    import java.io.InputStreamReader;
025    import java.net.HttpURLConnection;
026    import java.util.Properties;
027    
028    import org.apache.oozie.client.rest.JsonTags;
029    import org.apache.oozie.client.rest.RestConstants;
030    import org.json.simple.JSONObject;
031    import org.json.simple.JSONValue;
032    
033    public class XOozieClient extends OozieClient {
034    
035        public static final String JT = "mapred.job.tracker";
036    
037        public static final String NN = "fs.default.name";
038    
039        public static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
040    
041        public static final String NN_PRINCIPAL = "dfs.namenode.kerberos.principal";
042    
043        public static final String PIG_SCRIPT = "oozie.pig.script";
044    
045        public static final String PIG_OPTIONS = "oozie.pig.options";
046    
047        public static final String FILES = "oozie.files";
048    
049        public static final String ARCHIVES = "oozie.archives";
050        
051        public static final String IS_PROXY_SUBMISSION = "oozie.proxysubmission";
052    
053        protected XOozieClient() {
054        }
055    
056        /**
057         * Create an eXtended Workflow client instance.
058         *
059         * @param oozieUrl URL of the Oozie instance it will interact with.
060         */
061        public XOozieClient(String oozieUrl) {
062            super(oozieUrl);
063        }
064    
065        private String readPigScript(String script) throws IOException {
066            if (!new File(script).exists()) {
067                throw new IOException("Error: Pig script file [" + script + "] does not exist");
068            }
069    
070            BufferedReader br = null;
071            try {
072                br = new BufferedReader(new FileReader(script));
073                StringBuilder sb = new StringBuilder();
074                String line;
075                while ((line = br.readLine()) != null) {
076                    sb.append(line + "\n");
077                }
078                return sb.toString();
079            }
080            finally {
081                try {
082                    br.close();
083                }
084                catch (IOException ex) {
085                    System.err.println("Error: " + ex.getMessage());
086                }
087            }
088        }
089    
090        static void setStrings(Properties conf, String key, String[] values) {
091            if (values != null) {
092                conf.setProperty(key + ".size", (new Integer(values.length)).toString());
093                for (int i = 0; i < values.length; i++) {
094                    conf.setProperty(key + "." + i, values[i]);
095                }
096            }
097        }
098    
099        private void validateHttpSubmitConf(Properties conf) {
100            String JT = conf.getProperty(XOozieClient.JT);
101            if (JT == null) {
102                throw new RuntimeException("jobtracker is not specified in conf");
103            }
104    
105            String NN = conf.getProperty(XOozieClient.NN);
106            if (NN == null) {
107                throw new RuntimeException("namenode is not specified in conf");
108            }
109    
110            String libPath = conf.getProperty(LIBPATH);
111            if (libPath == null) {
112                throw new RuntimeException("libpath is not specified in conf");
113            }
114            if (!libPath.startsWith("hdfs://")) {
115                String newLibPath = NN + libPath;
116                conf.setProperty(LIBPATH, newLibPath);
117            }
118            
119            conf.setProperty(IS_PROXY_SUBMISSION, "true");
120        }
121    
122        /**
123         * Submit a Pig job via HTTP.
124         *
125         * @param conf job configuration.
126         * @param pigScriptFile pig script file.
127         * @param pigArgs pig arguments string.
128         * @return the job Id.
129         * @throws OozieClientException thrown if the job could not be submitted.
130         */
131        public String submitPig(Properties conf, String pigScriptFile, String[] pigArgs) throws IOException, OozieClientException {
132            if (conf == null) {
133                throw new IllegalArgumentException("conf cannot be null");
134            }
135            if (pigScriptFile == null) {
136                throw new IllegalArgumentException("pigScriptFile cannot be null");
137            }
138    
139            validateHttpSubmitConf(conf);
140    
141            conf.setProperty(XOozieClient.PIG_SCRIPT, readPigScript(pigScriptFile));
142            setStrings(conf, XOozieClient.PIG_OPTIONS, pigArgs);
143    
144            return (new HttpJobSubmit(conf, "pig")).call();
145        }
146    
147        /**
148         * Submit a Map/Reduce job via HTTP.
149         *
150         * @param conf job configuration.
151         * @return the job Id.
152         * @throws OozieClientException thrown if the job could not be submitted.
153         */
154        public String submitMapReduce(Properties conf) throws OozieClientException {
155            if (conf == null) {
156                throw new IllegalArgumentException("conf cannot be null");
157            }
158    
159            validateHttpSubmitConf(conf);
160    
161            return (new HttpJobSubmit(conf, "mapreduce")).call();
162        }
163    
164        private class HttpJobSubmit extends ClientCallable<String> {
165            private Properties conf;
166    
167            HttpJobSubmit(Properties conf, String jobType) {
168                super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, jobType));
169                this.conf = notNull(conf, "conf");
170            }
171    
172            @Override
173            protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
174                conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
175                writeToXml(conf, conn.getOutputStream());
176                if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) {
177                    JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
178                    return (String) json.get(JsonTags.JOB_ID);
179                }
180                if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
181                    handleError(conn);
182                }
183                return null;
184            }
185        }
186    
187        /**
188         * set LIBPATH for HTTP submission job.
189         *
190         * @param conf Configuration object.
191         * @param path lib HDFS path.
192         */
193        public void setLib(Properties conf, String pathStr) {
194            conf.setProperty(LIBPATH, pathStr);
195        }
196    
197        /**
198         * The equivalent to <file> tag in oozie's workflow xml.
199         *
200         * @param conf Configuration object.
201         * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name.
202         *             For example, "/user/oozie/parameter_file#myparams". If no "#..." is specified, file name will be used as
203         *             symbolic link name.
204         */
205        public void addFile(Properties conf, String file) {
206            if (file == null || file.length() == 0) {
207                throw new IllegalArgumentException("file cannot be null or empty");
208            }
209            String files = conf.getProperty(FILES);
210            conf.setProperty(FILES, files == null ? file : files + "," + file);
211        }
212    
213        /**
214         * The equivalent to <archive> tag in oozie's workflow xml.
215         *
216         * @param conf Configuration object.
217         * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name.
218         *             For example, "/user/oozie/udf1.jar#my.jar". If no "#..." is specified, file name will be used as
219         *             symbolic link name.
220         */
221        public void addArchive(Properties conf, String file) {
222            if (file == null || file.length() == 0) {
223                throw new IllegalArgumentException("file cannot be null or empty");
224            }
225            String files = conf.getProperty(ARCHIVES);
226            conf.setProperty(ARCHIVES, files == null ? file : files + "," + file);
227        }
228    }