001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.client;
019    
020    import java.io.BufferedReader;
021    import java.io.File;
022    import java.io.FileReader;
023    import java.io.IOException;
024    import java.io.InputStreamReader;
025    import java.net.HttpURLConnection;
026    import java.util.Properties;
027    
028    import org.apache.oozie.client.rest.JsonTags;
029    import org.apache.oozie.client.rest.RestConstants;
030    import org.json.simple.JSONObject;
031    import org.json.simple.JSONValue;
032    
033    public class XOozieClient extends OozieClient {
034    
035        public static final String JT = "mapred.job.tracker";
036        public static final String JT_2 = "mapreduce.jobtracker.address";
037    
038        public static final String NN = "fs.default.name";
039        public static final String NN_2 = "fs.defaultFS";
040    
041        @Deprecated
042        public static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
043    
044        @Deprecated
045        public static final String NN_PRINCIPAL = "dfs.namenode.kerberos.principal";
046    
047        public static final String PIG_SCRIPT = "oozie.pig.script";
048    
049        public static final String PIG_OPTIONS = "oozie.pig.options";
050    
051        public static final String FILES = "oozie.files";
052    
053        public static final String ARCHIVES = "oozie.archives";
054    
055        public static final String IS_PROXY_SUBMISSION = "oozie.proxysubmission";
056    
057        protected XOozieClient() {
058        }
059    
060        /**
061         * Create an eXtended Workflow client instance.
062         *
063         * @param oozieUrl URL of the Oozie instance it will interact with.
064         */
065        public XOozieClient(String oozieUrl) {
066            super(oozieUrl);
067        }
068    
069        private String readPigScript(String script) throws IOException {
070            if (!new File(script).exists()) {
071                throw new IOException("Error: Pig script file [" + script + "] does not exist");
072            }
073    
074            BufferedReader br = null;
075            try {
076                br = new BufferedReader(new FileReader(script));
077                StringBuilder sb = new StringBuilder();
078                String line;
079                while ((line = br.readLine()) != null) {
080                    sb.append(line + "\n");
081                }
082                return sb.toString();
083            }
084            finally {
085                try {
086                    br.close();
087                }
088                catch (IOException ex) {
089                    System.err.println("Error: " + ex.getMessage());
090                }
091            }
092        }
093    
094        static void setStrings(Properties conf, String key, String[] values) {
095            if (values != null) {
096                conf.setProperty(key + ".size", (new Integer(values.length)).toString());
097                for (int i = 0; i < values.length; i++) {
098                    conf.setProperty(key + "." + i, values[i]);
099                }
100            }
101        }
102    
103        private void validateHttpSubmitConf(Properties conf) {
104            String JT = conf.getProperty(XOozieClient.JT);
105            String JT_2 = conf.getProperty(XOozieClient.JT_2);
106            if (JT == null) {
107                if(JT_2 == null) {
108                    throw new RuntimeException("jobtracker is not specified in conf");
109                }
110            }
111    
112            String NN = conf.getProperty(XOozieClient.NN);
113            String NN_2 = conf.getProperty(XOozieClient.NN_2);
114            if (NN == null) {
115                if(NN_2 == null) {
116                    throw new RuntimeException("namenode is not specified in conf");
117                }
118            }
119    
120            String libPath = conf.getProperty(LIBPATH);
121            if (libPath == null) {
122                throw new RuntimeException("libpath is not specified in conf");
123            }
124            if (!libPath.contains(":/")) {
125                String newLibPath;
126                if (libPath.startsWith("/")) {
127                    if(NN.endsWith("/")) {
128                        newLibPath = NN + libPath.substring(1);
129                    } else {
130                        newLibPath = NN + libPath;
131                    }
132                } else {
133                    throw new RuntimeException("libpath should be absolute");
134                }
135                conf.setProperty(LIBPATH, newLibPath);
136            }
137    
138            conf.setProperty(IS_PROXY_SUBMISSION, "true");
139        }
140    
141        /**
142         * Submit a Pig job via HTTP.
143         *
144         * @param conf job configuration.
145         * @param pigScriptFile pig script file.
146         * @param pigArgs pig arguments string.
147         * @return the job Id.
148         * @throws OozieClientException thrown if the job could not be submitted.
149         */
150        public String submitPig(Properties conf, String pigScriptFile, String[] pigArgs) throws IOException, OozieClientException {
151            if (conf == null) {
152                throw new IllegalArgumentException("conf cannot be null");
153            }
154            if (pigScriptFile == null) {
155                throw new IllegalArgumentException("pigScriptFile cannot be null");
156            }
157    
158            validateHttpSubmitConf(conf);
159    
160            conf.setProperty(XOozieClient.PIG_SCRIPT, readPigScript(pigScriptFile));
161            setStrings(conf, XOozieClient.PIG_OPTIONS, pigArgs);
162    
163            return (new HttpJobSubmit(conf, "pig")).call();
164        }
165    
166        /**
167         * Submit a Map/Reduce job via HTTP.
168         *
169         * @param conf job configuration.
170         * @return the job Id.
171         * @throws OozieClientException thrown if the job could not be submitted.
172         */
173        public String submitMapReduce(Properties conf) throws OozieClientException {
174            if (conf == null) {
175                throw new IllegalArgumentException("conf cannot be null");
176            }
177    
178            validateHttpSubmitConf(conf);
179    
180            return (new HttpJobSubmit(conf, "mapreduce")).call();
181        }
182    
183        private class HttpJobSubmit extends ClientCallable<String> {
184            private Properties conf;
185    
186            HttpJobSubmit(Properties conf, String jobType) {
187                super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, jobType));
188                this.conf = notNull(conf, "conf");
189            }
190    
191            @Override
192            protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
193                conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
194                writeToXml(conf, conn.getOutputStream());
195                if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) {
196                    JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
197                    return (String) json.get(JsonTags.JOB_ID);
198                }
199                if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
200                    handleError(conn);
201                }
202                return null;
203            }
204        }
205    
206        /**
207         * set LIBPATH for HTTP submission job.
208         *
209         * @param conf Configuration object.
210         * @param pathStr lib HDFS path.
211         */
212        public void setLib(Properties conf, String pathStr) {
213            conf.setProperty(LIBPATH, pathStr);
214        }
215    
216        /**
217         * The equivalent to <file> tag in oozie's workflow xml.
218         *
219         * @param conf Configuration object.
220         * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name.
221         *             For example, "/user/oozie/parameter_file#myparams". If no "#..." is specified, file name will be used as
222         *             symbolic link name.
223         */
224        public void addFile(Properties conf, String file) {
225            if (file == null || file.length() == 0) {
226                throw new IllegalArgumentException("file cannot be null or empty");
227            }
228            String files = conf.getProperty(FILES);
229            conf.setProperty(FILES, files == null ? file : files + "," + file);
230        }
231    
232        /**
233         * The equivalent to <archive> tag in oozie's workflow xml.
234         *
235         * @param conf Configuration object.
236         * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name.
237         *             For example, "/user/oozie/udf1.jar#my.jar". If no "#..." is specified, file name will be used as
238         *             symbolic link name.
239         */
240        public void addArchive(Properties conf, String file) {
241            if (file == null || file.length() == 0) {
242                throw new IllegalArgumentException("file cannot be null or empty");
243            }
244            String files = conf.getProperty(ARCHIVES);
245            conf.setProperty(ARCHIVES, files == null ? file : files + "," + file);
246        }
247    }