001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.client;
019    
020    import java.io.BufferedReader;
021    import java.io.IOException;
022    import java.io.InputStream;
023    import java.io.InputStreamReader;
024    import java.io.OutputStream;
025    import java.io.PrintStream;
026    import java.io.Reader;
027    import java.net.HttpURLConnection;
028    import java.net.URL;
029    import java.net.URLEncoder;
030    import java.util.ArrayList;
031    import java.util.Collections;
032    import java.util.HashMap;
033    import java.util.Iterator;
034    import java.util.LinkedHashMap;
035    import java.util.List;
036    import java.util.Map;
037    import java.util.Properties;
038    import java.util.concurrent.Callable;
039    
040    import javax.xml.parsers.DocumentBuilderFactory;
041    import javax.xml.transform.Transformer;
042    import javax.xml.transform.TransformerFactory;
043    import javax.xml.transform.dom.DOMSource;
044    import javax.xml.transform.stream.StreamResult;
045    
046    import org.apache.oozie.BuildInfo;
047    import org.apache.oozie.client.rest.JsonTags;
048    import org.apache.oozie.client.rest.JsonToBean;
049    import org.apache.oozie.client.rest.RestConstants;
050    import org.json.simple.JSONArray;
051    import org.json.simple.JSONObject;
052    import org.json.simple.JSONValue;
053    import org.w3c.dom.Document;
054    import org.w3c.dom.Element;
055    
056    /**
057     * Client API to submit and manage Oozie workflow jobs against an Oozie intance.
058     * <p/>
059     * This class is thread safe.
060     * <p/>
061     * Syntax for filter for the {@link #getJobsInfo(String)} {@link #getJobsInfo(String, int, int)} methods:
062     * <code>[NAME=VALUE][;NAME=VALUE]*</code>.
063     * <p/>
064     * Valid filter names are:
065     * <p/>
066     * <ul/>
067     * <li>name: the workflow application name from the workflow definition.</li>
068     * <li>user: the user that submitted the job.</li>
069     * <li>group: the group for the job.</li>
070     * <li>status: the status of the job.</li>
071     * </ul>
072     * <p/>
073     * The query will do an AND among all the filter names. The query will do an OR among all the filter values for the same
074     * name. Multiple values must be specified as different name value pairs.
075     */
076    public class OozieClient {
077    
078        public static final long WS_PROTOCOL_VERSION_0 = 0;
079    
080        public static final long WS_PROTOCOL_VERSION = 1;
081    
082        public static final String USER_NAME = "user.name";
083    
084        @Deprecated
085        public static final String GROUP_NAME = "group.name";
086    
087        public static final String JOB_ACL = "oozie.job.acl";
088    
089        public static final String APP_PATH = "oozie.wf.application.path";
090    
091        public static final String COORDINATOR_APP_PATH = "oozie.coord.application.path";
092    
093        public static final String BUNDLE_APP_PATH = "oozie.bundle.application.path";
094    
095        public static final String BUNDLE_ID = "oozie.bundle.id";
096    
097        public static final String EXTERNAL_ID = "oozie.wf.external.id";
098    
099        public static final String WORKFLOW_NOTIFICATION_URL = "oozie.wf.workflow.notification.url";
100    
101        public static final String ACTION_NOTIFICATION_URL = "oozie.wf.action.notification.url";
102    
103        public static final String COORD_ACTION_NOTIFICATION_URL = "oozie.coord.action.notification.url";
104    
105        public static final String RERUN_SKIP_NODES = "oozie.wf.rerun.skip.nodes";
106    
107        public static final String RERUN_FAIL_NODES = "oozie.wf.rerun.failnodes";
108    
109        public static final String LOG_TOKEN = "oozie.wf.log.token";
110    
111        public static final String ACTION_MAX_RETRIES = "oozie.wf.action.max.retries";
112    
113        public static final String ACTION_RETRY_INTERVAL = "oozie.wf.action.retry.interval";
114    
115        public static final String FILTER_USER = "user";
116    
117        public static final String FILTER_GROUP = "group";
118    
119        public static final String FILTER_NAME = "name";
120    
121        public static final String FILTER_STATUS = "status";
122    
123        public static final String FILTER_FREQUENCY = "frequency";
124    
125        public static final String FILTER_ID = "id";
126    
127        public static final String FILTER_UNIT = "unit";
128    
129        public static final String CHANGE_VALUE_ENDTIME = "endtime";
130    
131        public static final String CHANGE_VALUE_PAUSETIME = "pausetime";
132    
133        public static final String CHANGE_VALUE_CONCURRENCY = "concurrency";
134    
135        public static final String LIBPATH = "oozie.libpath";
136    
137        public static final String USE_SYSTEM_LIBPATH = "oozie.use.system.libpath";
138    
139        public static enum SYSTEM_MODE {
140            NORMAL, NOWEBSERVICE, SAFEMODE
141        };
142    
143        /**
144         * debugMode =0 means no debugging. > 0 means debugging on.
145         */
146        public int debugMode = 0;
147    
148        private String baseUrl;
149        private String protocolUrl;
150        private boolean validatedVersion = false;
151        private final Map<String, String> headers = new HashMap<String, String>();
152    
153        private static ThreadLocal<String> USER_NAME_TL = new ThreadLocal<String>();
154    
155        /**
156         * Allows to impersonate other users in the Oozie server. The current user
157         * must be configured as a proxyuser in Oozie.
158         * <p/>
159         * IMPORTANT: impersonation happens only with Oozie client requests done within
160         * doAs() calls.
161         *
162         * @param userName user to impersonate.
163         * @param callable callable with {@link OozieClient} calls impersonating the specified user.
164         * @return any response returned by the {@link Callable#call()} method.
165         * @throws Exception thrown by the {@link Callable#call()} method.
166         */
167        public static <T> T doAs(String userName, Callable<T> callable) throws Exception {
168            notEmpty(userName, "userName");
169            notNull(callable, "callable");
170            try {
171                USER_NAME_TL.set(userName);
172                return callable.call();
173            }
174            finally {
175                USER_NAME_TL.remove();
176            }
177        }
178    
179        protected OozieClient() {
180        }
181    
182        /**
183         * Create a Workflow client instance.
184         *
185         * @param oozieUrl URL of the Oozie instance it will interact with.
186         */
187        public OozieClient(String oozieUrl) {
188            this.baseUrl = notEmpty(oozieUrl, "oozieUrl");
189            if (!this.baseUrl.endsWith("/")) {
190                this.baseUrl += "/";
191            }
192        }
193    
194        /**
195         * Return the Oozie URL of the workflow client instance.
196         * <p/>
197         * This URL is the base URL fo the Oozie system, with not protocol versioning.
198         *
199         * @return the Oozie URL of the workflow client instance.
200         */
201        public String getOozieUrl() {
202            return baseUrl;
203        }
204    
205        /**
206         * Return the Oozie URL used by the client and server for WS communications.
207         * <p/>
208         * This URL is the original URL plus the versioning element path.
209         *
210         * @return the Oozie URL used by the client and server for communication.
211         * @throws OozieClientException thrown in the client and the server are not protocol compatible.
212         */
213        public String getProtocolUrl() throws OozieClientException {
214            validateWSVersion();
215            return protocolUrl;
216        }
217    
218        /**
219         * @return current debug Mode
220         */
221        public int getDebugMode() {
222            return debugMode;
223        }
224    
225        /**
226         * Set debug mode.
227         *
228         * @param debugMode : 0 means no debugging. > 0 means debugging
229         */
230        public void setDebugMode(int debugMode) {
231            this.debugMode = debugMode;
232        }
233    
234        /**
235         * Validate that the Oozie client and server instances are protocol compatible.
236         *
237         * @throws OozieClientException thrown in the client and the server are not protocol compatible.
238         */
239        public synchronized void validateWSVersion() throws OozieClientException {
240            if (!validatedVersion) {
241                try {
242                    URL url = new URL(baseUrl + RestConstants.VERSIONS);
243                    HttpURLConnection conn = createConnection(url, "GET");
244                    if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) {
245                        JSONArray array = (JSONArray) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
246                        if (array == null) {
247                            throw new OozieClientException("HTTP error", "no response message");
248                        }
249                        if (!array.contains(WS_PROTOCOL_VERSION) && !array.contains(WS_PROTOCOL_VERSION_0)) {
250                            StringBuilder msg = new StringBuilder();
251                            msg.append("Supported version [").append(WS_PROTOCOL_VERSION).append(
252                                    "] or less, Unsupported versions[");
253                            String separator = "";
254                            for (Object version : array) {
255                                msg.append(separator).append(version);
256                            }
257                            msg.append("]");
258                            throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, msg.toString());
259                        }
260                        if (array.contains(WS_PROTOCOL_VERSION)) {
261                            protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION + "/";
262                        }
263                        else {
264                            if (array.contains(WS_PROTOCOL_VERSION_0)) {
265                                protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION_0 + "/";
266                            }
267                        }
268                    }
269                    else {
270                        handleError(conn);
271                    }
272                }
273                catch (IOException ex) {
274                    throw new OozieClientException(OozieClientException.IO_ERROR, ex);
275                }
276                validatedVersion = true;
277            }
278        }
279    
280        /**
281         * Create an empty configuration with just the {@link #USER_NAME} set to the JVM user name.
282         *
283         * @return an empty configuration.
284         */
285        public Properties createConfiguration() {
286            Properties conf = new Properties();
287            String userName = USER_NAME_TL.get();
288            if (userName == null) {
289                userName = System.getProperty("user.name");
290            }
291            conf.setProperty(USER_NAME, userName);
292            return conf;
293        }
294    
295        /**
296         * Set a HTTP header to be used in the WS requests by the workflow instance.
297         *
298         * @param name header name.
299         * @param value header value.
300         */
301        public void setHeader(String name, String value) {
302            headers.put(notEmpty(name, "name"), notNull(value, "value"));
303        }
304    
305        /**
306         * Get the value of a set HTTP header from the workflow instance.
307         *
308         * @param name header name.
309         * @return header value, <code>null</code> if not set.
310         */
311        public String getHeader(String name) {
312            return headers.get(notEmpty(name, "name"));
313        }
314    
315        /**
316         * Get the set HTTP header
317         *
318         * @return map of header key and value
319         */
320        public Map<String, String> getHeaders() {
321            return headers;
322        }
323    
324        /**
325         * Remove a HTTP header from the workflow client instance.
326         *
327         * @param name header name.
328         */
329        public void removeHeader(String name) {
330            headers.remove(notEmpty(name, "name"));
331        }
332    
333        /**
334         * Return an iterator with all the header names set in the workflow instance.
335         *
336         * @return header names.
337         */
338        public Iterator<String> getHeaderNames() {
339            return Collections.unmodifiableMap(headers).keySet().iterator();
340        }
341    
342        private URL createURL(String collection, String resource, Map<String, String> parameters) throws IOException,
343                OozieClientException {
344            validateWSVersion();
345            StringBuilder sb = new StringBuilder();
346            sb.append(protocolUrl).append(collection);
347            if (resource != null && resource.length() > 0) {
348                sb.append("/").append(resource);
349            }
350            if (parameters.size() > 0) {
351                String separator = "?";
352                for (Map.Entry<String, String> param : parameters.entrySet()) {
353                    if (param.getValue() != null) {
354                        sb.append(separator).append(URLEncoder.encode(param.getKey(), "UTF-8")).append("=").append(
355                                URLEncoder.encode(param.getValue(), "UTF-8"));
356                        separator = "&";
357                    }
358                }
359            }
360            return new URL(sb.toString());
361        }
362    
363        private boolean validateCommand(String url) {
364            {
365                if (protocolUrl.contains(baseUrl + "v0")) {
366                    if (url.contains("dryrun") || url.contains("jobtype=c") || url.contains("systemmode")) {
367                        return false;
368                    }
369                }
370            }
371            return true;
372        }
373    
374        /**
375         * Create http connection to oozie server.
376         *
377         * @param url
378         * @param method
379         * @return connection
380         * @throws IOException
381         * @throws OozieClientException
382         */
383        protected HttpURLConnection createConnection(URL url, String method) throws IOException, OozieClientException {
384            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
385            conn.setRequestMethod(method);
386            if (method.equals("POST") || method.equals("PUT")) {
387                conn.setDoOutput(true);
388            }
389            for (Map.Entry<String, String> header : headers.entrySet()) {
390                conn.setRequestProperty(header.getKey(), header.getValue());
391            }
392            return conn;
393        }
394    
395        protected abstract class ClientCallable<T> implements Callable<T> {
396            private final String method;
397            private final String collection;
398            private final String resource;
399            private final Map<String, String> params;
400    
401            public ClientCallable(String method, String collection, String resource, Map<String, String> params) {
402                this.method = method;
403                this.collection = collection;
404                this.resource = resource;
405                this.params = params;
406            }
407    
408            public T call() throws OozieClientException {
409                try {
410                    URL url = createURL(collection, resource, params);
411                    if (validateCommand(url.toString())) {
412                        if (getDebugMode() > 0) {
413                            System.out.println("Connection URL:[" + url + "]");
414                        }
415                        HttpURLConnection conn = createConnection(url, method);
416                        return call(conn);
417                    }
418                    else {
419                        System.out.println("Option not supported in target server. Supported only on Oozie-2.0 or greater."
420                                + " Use 'oozie help' for details");
421                        throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, new Exception());
422                    }
423                }
424                catch (IOException ex) {
425                    throw new OozieClientException(OozieClientException.IO_ERROR, ex);
426                }
427    
428            }
429    
430            protected abstract T call(HttpURLConnection conn) throws IOException, OozieClientException;
431        }
432    
433        static void handleError(HttpURLConnection conn) throws IOException, OozieClientException {
434            int status = conn.getResponseCode();
435            String error = conn.getHeaderField(RestConstants.OOZIE_ERROR_CODE);
436            String message = conn.getHeaderField(RestConstants.OOZIE_ERROR_MESSAGE);
437    
438            if (error == null) {
439                error = "HTTP error code: " + status;
440            }
441    
442            if (message == null) {
443                message = conn.getResponseMessage();
444            }
445            throw new OozieClientException(error, message);
446        }
447    
448        static Map<String, String> prepareParams(String... params) {
449            Map<String, String> map = new LinkedHashMap<String, String>();
450            for (int i = 0; i < params.length; i = i + 2) {
451                map.put(params[i], params[i + 1]);
452            }
453            String doAsUserName = USER_NAME_TL.get();
454            if (doAsUserName != null) {
455                map.put(RestConstants.DO_AS_PARAM, doAsUserName);
456            }
457            return map;
458        }
459    
460        public void writeToXml(Properties props, OutputStream out) throws IOException {
461            try {
462                Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument();
463                Element conf = doc.createElement("configuration");
464                doc.appendChild(conf);
465                conf.appendChild(doc.createTextNode("\n"));
466                for (String name : props.stringPropertyNames()) { // Properties whose key or value is not of type String are omitted.
467                    String value = props.getProperty(name);
468                    Element propNode = doc.createElement("property");
469                    conf.appendChild(propNode);
470    
471                    Element nameNode = doc.createElement("name");
472                    nameNode.appendChild(doc.createTextNode(name.trim()));
473                    propNode.appendChild(nameNode);
474    
475                    Element valueNode = doc.createElement("value");
476                    valueNode.appendChild(doc.createTextNode(value.trim()));
477                    propNode.appendChild(valueNode);
478    
479                    conf.appendChild(doc.createTextNode("\n"));
480                }
481    
482                DOMSource source = new DOMSource(doc);
483                StreamResult result = new StreamResult(out);
484                TransformerFactory transFactory = TransformerFactory.newInstance();
485                Transformer transformer = transFactory.newTransformer();
486                transformer.transform(source, result);
487            }
488            catch (Exception e) {
489                throw new IOException(e);
490            }
491        }
492    
493        private class JobSubmit extends ClientCallable<String> {
494            private final Properties conf;
495    
496            JobSubmit(Properties conf, boolean start) {
497                super("POST", RestConstants.JOBS, "", (start) ? prepareParams(RestConstants.ACTION_PARAM,
498                        RestConstants.JOB_ACTION_START) : prepareParams());
499                this.conf = notNull(conf, "conf");
500            }
501    
502            JobSubmit(String jobId, Properties conf) {
503                super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
504                        RestConstants.JOB_ACTION_RERUN));
505                this.conf = notNull(conf, "conf");
506            }
507    
508            public JobSubmit(Properties conf, String jobActionDryrun) {
509                super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.ACTION_PARAM,
510                        RestConstants.JOB_ACTION_DRYRUN));
511                this.conf = notNull(conf, "conf");
512            }
513    
514            @Override
515            protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
516                conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
517                writeToXml(conf, conn.getOutputStream());
518                if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) {
519                    JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
520                    return (String) json.get(JsonTags.JOB_ID);
521                }
522                if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
523                    handleError(conn);
524                }
525                return null;
526            }
527        }
528    
529        /**
530         * Submit a workflow job.
531         *
532         * @param conf job configuration.
533         * @return the job Id.
534         * @throws OozieClientException thrown if the job could not be submitted.
535         */
536        public String submit(Properties conf) throws OozieClientException {
537            return (new JobSubmit(conf, false)).call();
538        }
539    
540        private class JobAction extends ClientCallable<Void> {
541    
542            JobAction(String jobId, String action) {
543                super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, action));
544            }
545    
546            JobAction(String jobId, String action, String params) {
547                super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, action,
548                        RestConstants.JOB_CHANGE_VALUE, params));
549            }
550    
551            @Override
552            protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
553                if (!(conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
554                    handleError(conn);
555                }
556                return null;
557            }
558        }
559    
560        /**
561         * dryrun for a given job
562         *
563         * @param conf Job configuration.
564         */
565        public String dryrun(Properties conf) throws OozieClientException {
566            return new JobSubmit(conf, RestConstants.JOB_ACTION_DRYRUN).call();
567        }
568    
569        /**
570         * Start a workflow job.
571         *
572         * @param jobId job Id.
573         * @throws OozieClientException thrown if the job could not be started.
574         */
575        public void start(String jobId) throws OozieClientException {
576            new JobAction(jobId, RestConstants.JOB_ACTION_START).call();
577        }
578    
579        /**
580         * Submit and start a workflow job.
581         *
582         * @param conf job configuration.
583         * @return the job Id.
584         * @throws OozieClientException thrown if the job could not be submitted.
585         */
586        public String run(Properties conf) throws OozieClientException {
587            return (new JobSubmit(conf, true)).call();
588        }
589    
590        /**
591         * Rerun a workflow job.
592         *
593         * @param jobId job Id to rerun.
594         * @param conf configuration information for the rerun.
595         * @throws OozieClientException thrown if the job could not be started.
596         */
597        public void reRun(String jobId, Properties conf) throws OozieClientException {
598            new JobSubmit(jobId, conf).call();
599        }
600    
601        /**
602         * Suspend a workflow job.
603         *
604         * @param jobId job Id.
605         * @throws OozieClientException thrown if the job could not be suspended.
606         */
607        public void suspend(String jobId) throws OozieClientException {
608            new JobAction(jobId, RestConstants.JOB_ACTION_SUSPEND).call();
609        }
610    
611        /**
612         * Resume a workflow job.
613         *
614         * @param jobId job Id.
615         * @throws OozieClientException thrown if the job could not be resume.
616         */
617        public void resume(String jobId) throws OozieClientException {
618            new JobAction(jobId, RestConstants.JOB_ACTION_RESUME).call();
619        }
620    
621        /**
622         * Kill a workflow job.
623         *
624         * @param jobId job Id.
625         * @throws OozieClientException thrown if the job could not be killed.
626         */
627        public void kill(String jobId) throws OozieClientException {
628            new JobAction(jobId, RestConstants.JOB_ACTION_KILL).call();
629        }
630    
631        /**
632         * Change a coordinator job.
633         *
634         * @param jobId job Id.
635         * @param changeValue change value.
636         * @throws OozieClientException thrown if the job could not be changed.
637         */
638        public void change(String jobId, String changeValue) throws OozieClientException {
639            new JobAction(jobId, RestConstants.JOB_ACTION_CHANGE, changeValue).call();
640        }
641    
642        private class JobInfo extends ClientCallable<WorkflowJob> {
643    
644            JobInfo(String jobId, int start, int len) {
645                super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
646                        RestConstants.JOB_SHOW_INFO, RestConstants.OFFSET_PARAM, Integer.toString(start),
647                        RestConstants.LEN_PARAM, Integer.toString(len)));
648            }
649    
650            @Override
651            protected WorkflowJob call(HttpURLConnection conn) throws IOException, OozieClientException {
652                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
653                    Reader reader = new InputStreamReader(conn.getInputStream());
654                    JSONObject json = (JSONObject) JSONValue.parse(reader);
655                    return JsonToBean.createWorkflowJob(json);
656                }
657                else {
658                    handleError(conn);
659                }
660                return null;
661            }
662        }
663    
664        private class WorkflowActionInfo extends ClientCallable<WorkflowAction> {
665            WorkflowActionInfo(String actionId) {
666                super("GET", RestConstants.JOB, notEmpty(actionId, "id"), prepareParams(RestConstants.JOB_SHOW_PARAM,
667                        RestConstants.JOB_SHOW_INFO));
668            }
669    
670            @Override
671            protected WorkflowAction call(HttpURLConnection conn) throws IOException, OozieClientException {
672                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
673                    Reader reader = new InputStreamReader(conn.getInputStream());
674                    JSONObject json = (JSONObject) JSONValue.parse(reader);
675                    return JsonToBean.createWorkflowAction(json);
676                }
677                else {
678                    handleError(conn);
679                }
680                return null;
681            }
682        }
683    
684        /**
685         * Get the info of a workflow job.
686         *
687         * @param jobId job Id.
688         * @return the job info.
689         * @throws OozieClientException thrown if the job info could not be retrieved.
690         */
691        public WorkflowJob getJobInfo(String jobId) throws OozieClientException {
692            return getJobInfo(jobId, 0, 0);
693        }
694    
695        /**
696         * Get the info of a workflow job and subset actions.
697         *
698         * @param jobId job Id.
699         * @param start starting index in the list of actions belonging to the job
700         * @param len number of actions to be returned
701         * @return the job info.
702         * @throws OozieClientException thrown if the job info could not be retrieved.
703         */
704        public WorkflowJob getJobInfo(String jobId, int start, int len) throws OozieClientException {
705            return new JobInfo(jobId, start, len).call();
706        }
707    
708        /**
709         * Get the info of a workflow action.
710         *
711         * @param actionId Id.
712         * @return the workflow action info.
713         * @throws OozieClientException thrown if the job info could not be retrieved.
714         */
715        public WorkflowAction getWorkflowActionInfo(String actionId) throws OozieClientException {
716            return new WorkflowActionInfo(actionId).call();
717        }
718    
719        /**
720         * Get the log of a workflow job.
721         *
722         * @param jobId job Id.
723         * @return the job log.
724         * @throws OozieClientException thrown if the job info could not be retrieved.
725         */
726        public String getJobLog(String jobId) throws OozieClientException {
727            return new JobLog(jobId).call();
728        }
729    
730        /**
731         * Get the log of a job.
732         *
733         * @param jobId job Id.
734         * @param logRetrievalType Based on which filter criteria the log is retrieved
735         * @param logRetrievalScope Value for the retrieval type
736         * @param ps Printstream of command line interface
737         * @throws OozieClientException thrown if the job info could not be retrieved.
738         */
739        public void getJobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps)
740                throws OozieClientException {
741            new JobLog(jobId, logRetrievalType, logRetrievalScope, ps).call();
742        }
743    
744        private class JobLog extends JobMetadata {
745            JobLog(String jobId) {
746                super(jobId, RestConstants.JOB_SHOW_LOG);
747            }
748    
749            JobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps) {
750                super(jobId, logRetrievalType, logRetrievalScope, RestConstants.JOB_SHOW_LOG, ps);
751            }
752        }
753    
754        /**
755         * Get the definition of a workflow job.
756         *
757         * @param jobId job Id.
758         * @return the job log.
759         * @throws OozieClientException thrown if the job info could not be retrieved.
760         */
761        public String getJobDefinition(String jobId) throws OozieClientException {
762            return new JobDefinition(jobId).call();
763        }
764    
765        private class JobDefinition extends JobMetadata {
766    
767            JobDefinition(String jobId) {
768                super(jobId, RestConstants.JOB_SHOW_DEFINITION);
769            }
770        }
771    
772        private class JobMetadata extends ClientCallable<String> {
773            PrintStream printStream;
774    
775            JobMetadata(String jobId, String metaType) {
776                super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
777                        metaType));
778            }
779    
780            JobMetadata(String jobId, String logRetrievalType, String logRetrievalScope, String metaType, PrintStream ps) {
781                super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
782                        metaType, RestConstants.JOB_LOG_TYPE_PARAM, logRetrievalType, RestConstants.JOB_LOG_SCOPE_PARAM,
783                        logRetrievalScope));
784                printStream = ps;
785            }
786    
787            @Override
788            protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
789                String returnVal = null;
790                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
791                    InputStream is = conn.getInputStream();
792                    InputStreamReader isr = new InputStreamReader(is);
793                    try {
794                        if (printStream != null) {
795                            sendToOutputStream(isr, -1);
796                        }
797                        else {
798                            returnVal = getReaderAsString(isr, -1);
799                        }
800                    }
801                    finally {
802                        isr.close();
803                    }
804                }
805                else {
806                    handleError(conn);
807                }
808                return returnVal;
809            }
810    
811            /**
812             * Output the log to command line interface
813             *
814             * @param reader reader to read into a string.
815             * @param maxLen max content length allowed, if -1 there is no limit.
816             * @param ps Printstream of command line interface
817             * @throws IOException
818             */
819            private void sendToOutputStream(Reader reader, int maxLen) throws IOException {
820                if (reader == null) {
821                    throw new IllegalArgumentException("reader cannot be null");
822                }
823                StringBuilder sb = new StringBuilder();
824                char[] buffer = new char[2048];
825                int read;
826                int count = 0;
827                int noOfCharstoFlush = 1024;
828                while ((read = reader.read(buffer)) > -1) {
829                    count += read;
830                    if ((maxLen > -1) && (count > maxLen)) {
831                        break;
832                    }
833                    sb.append(buffer, 0, read);
834                    if (sb.length() > noOfCharstoFlush) {
835                        printStream.print(sb.toString());
836                        sb = new StringBuilder("");
837                    }
838                }
839                printStream.print(sb.toString());
840            }
841    
842            /**
843             * Return a reader as string.
844             * <p/>
845             *
846             * @param reader reader to read into a string.
847             * @param maxLen max content length allowed, if -1 there is no limit.
848             * @return the reader content.
849             * @throws IOException thrown if the resource could not be read.
850             */
851            private String getReaderAsString(Reader reader, int maxLen) throws IOException {
852                if (reader == null) {
853                    throw new IllegalArgumentException("reader cannot be null");
854                }
855                StringBuffer sb = new StringBuffer();
856                char[] buffer = new char[2048];
857                int read;
858                int count = 0;
859                while ((read = reader.read(buffer)) > -1) {
860                    count += read;
861    
862                    // read up to maxLen chars;
863                    if ((maxLen > -1) && (count > maxLen)) {
864                        break;
865                    }
866                    sb.append(buffer, 0, read);
867                }
868                reader.close();
869                return sb.toString();
870            }
871        }
872    
873        private class CoordJobInfo extends ClientCallable<CoordinatorJob> {
874    
875            CoordJobInfo(String jobId, String filter, int start, int len) {
876                super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
877                        RestConstants.JOB_SHOW_INFO, RestConstants.JOB_FILTER_PARAM, filter, RestConstants.OFFSET_PARAM, Integer.toString(start),
878                        RestConstants.LEN_PARAM, Integer.toString(len)));
879            }
880    
881            @Override
882            protected CoordinatorJob call(HttpURLConnection conn) throws IOException, OozieClientException {
883                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
884                    Reader reader = new InputStreamReader(conn.getInputStream());
885                    JSONObject json = (JSONObject) JSONValue.parse(reader);
886                    return JsonToBean.createCoordinatorJob(json);
887                }
888                else {
889                    handleError(conn);
890                }
891                return null;
892            }
893        }
894    
895        private class BundleJobInfo extends ClientCallable<BundleJob> {
896    
897            BundleJobInfo(String jobId) {
898                super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
899                        RestConstants.JOB_SHOW_INFO));
900            }
901    
902            @Override
903            protected BundleJob call(HttpURLConnection conn) throws IOException, OozieClientException {
904                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
905                    Reader reader = new InputStreamReader(conn.getInputStream());
906                    JSONObject json = (JSONObject) JSONValue.parse(reader);
907                    return JsonToBean.createBundleJob(json);
908                }
909                else {
910                    handleError(conn);
911                }
912                return null;
913            }
914        }
915    
916        private class CoordActionInfo extends ClientCallable<CoordinatorAction> {
917            CoordActionInfo(String actionId) {
918                super("GET", RestConstants.JOB, notEmpty(actionId, "id"), prepareParams(RestConstants.JOB_SHOW_PARAM,
919                        RestConstants.JOB_SHOW_INFO));
920            }
921    
922            @Override
923            protected CoordinatorAction call(HttpURLConnection conn) throws IOException, OozieClientException {
924                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
925                    Reader reader = new InputStreamReader(conn.getInputStream());
926                    JSONObject json = (JSONObject) JSONValue.parse(reader);
927                    return JsonToBean.createCoordinatorAction(json);
928                }
929                else {
930                    handleError(conn);
931                }
932                return null;
933            }
934        }
935    
936        /**
937         * Get the info of a bundle job.
938         *
939         * @param jobId job Id.
940         * @return the job info.
941         * @throws OozieClientException thrown if the job info could not be retrieved.
942         */
943        public BundleJob getBundleJobInfo(String jobId) throws OozieClientException {
944            return new BundleJobInfo(jobId).call();
945        }
946    
947        /**
948         * Get the info of a coordinator job.
949         *
950         * @param jobId job Id.
951         * @return the job info.
952         * @throws OozieClientException thrown if the job info could not be retrieved.
953         */
954        public CoordinatorJob getCoordJobInfo(String jobId) throws OozieClientException {
955            return new CoordJobInfo(jobId, null, 0, 0).call();
956        }
957    
958        /**
959         * Get the info of a coordinator job and subset actions.
960         *
961         * @param jobId job Id.
962         * @param filter filter the status filter
963         * @param start starting index in the list of actions belonging to the job
964         * @param len number of actions to be returned
965         * @return the job info.
966         * @throws OozieClientException thrown if the job info could not be retrieved.
967         */
968        public CoordinatorJob getCoordJobInfo(String jobId, String filter, int start, int len) throws OozieClientException {
969            return new CoordJobInfo(jobId, filter, start, len).call();
970        }
971    
972        /**
973         * Get the info of a coordinator action.
974         *
975         * @param actionId Id.
976         * @return the coordinator action info.
977         * @throws OozieClientException thrown if the job info could not be retrieved.
978         */
979        public CoordinatorAction getCoordActionInfo(String actionId) throws OozieClientException {
980            return new CoordActionInfo(actionId).call();
981        }
982    
983        private class JobsStatus extends ClientCallable<List<WorkflowJob>> {
984    
985            JobsStatus(String filter, int start, int len) {
986                super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter,
987                        RestConstants.JOBTYPE_PARAM, "wf", RestConstants.OFFSET_PARAM, Integer.toString(start),
988                        RestConstants.LEN_PARAM, Integer.toString(len)));
989            }
990    
991            @Override
992            @SuppressWarnings("unchecked")
993            protected List<WorkflowJob> call(HttpURLConnection conn) throws IOException, OozieClientException {
994                conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
995                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
996                    Reader reader = new InputStreamReader(conn.getInputStream());
997                    JSONObject json = (JSONObject) JSONValue.parse(reader);
998                    JSONArray workflows = (JSONArray) json.get(JsonTags.WORKFLOWS_JOBS);
999                    if (workflows == null) {
1000                        workflows = new JSONArray();
1001                    }
1002                    return JsonToBean.createWorkflowJobList(workflows);
1003                }
1004                else {
1005                    handleError(conn);
1006                }
1007                return null;
1008            }
1009        }
1010    
1011        private class CoordJobsStatus extends ClientCallable<List<CoordinatorJob>> {
1012    
1013            CoordJobsStatus(String filter, int start, int len) {
1014                super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter,
1015                        RestConstants.JOBTYPE_PARAM, "coord", RestConstants.OFFSET_PARAM, Integer.toString(start),
1016                        RestConstants.LEN_PARAM, Integer.toString(len)));
1017            }
1018    
1019            @Override
1020            protected List<CoordinatorJob> call(HttpURLConnection conn) throws IOException, OozieClientException {
1021                conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1022                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1023                    Reader reader = new InputStreamReader(conn.getInputStream());
1024                    JSONObject json = (JSONObject) JSONValue.parse(reader);
1025                    JSONArray jobs = (JSONArray) json.get(JsonTags.COORDINATOR_JOBS);
1026                    if (jobs == null) {
1027                        jobs = new JSONArray();
1028                    }
1029                    return JsonToBean.createCoordinatorJobList(jobs);
1030                }
1031                else {
1032                    handleError(conn);
1033                }
1034                return null;
1035            }
1036        }
1037    
1038        private class BundleJobsStatus extends ClientCallable<List<BundleJob>> {
1039    
1040            BundleJobsStatus(String filter, int start, int len) {
1041                super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter,
1042                        RestConstants.JOBTYPE_PARAM, "bundle", RestConstants.OFFSET_PARAM, Integer.toString(start),
1043                        RestConstants.LEN_PARAM, Integer.toString(len)));
1044            }
1045    
1046            @Override
1047            protected List<BundleJob> call(HttpURLConnection conn) throws IOException, OozieClientException {
1048                conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1049                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1050                    Reader reader = new InputStreamReader(conn.getInputStream());
1051                    JSONObject json = (JSONObject) JSONValue.parse(reader);
1052                    JSONArray jobs = (JSONArray) json.get(JsonTags.BUNDLE_JOBS);
1053                    if (jobs == null) {
1054                        jobs = new JSONArray();
1055                    }
1056                    return JsonToBean.createBundleJobList(jobs);
1057                }
1058                else {
1059                    handleError(conn);
1060                }
1061                return null;
1062            }
1063        }
1064    
1065        private class BulkResponseStatus extends ClientCallable<List<BulkResponse>> {
1066    
1067            BulkResponseStatus(String filter, int start, int len) {
1068                super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_BULK_PARAM, filter,
1069                        RestConstants.OFFSET_PARAM, Integer.toString(start), RestConstants.LEN_PARAM, Integer.toString(len)));
1070            }
1071    
1072            @Override
1073            protected List<BulkResponse> call(HttpURLConnection conn) throws IOException, OozieClientException {
1074                conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1075                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1076                    Reader reader = new InputStreamReader(conn.getInputStream());
1077                    JSONObject json = (JSONObject) JSONValue.parse(reader);
1078                    JSONArray results = (JSONArray) json.get(JsonTags.BULK_RESPONSES);
1079                    if (results == null) {
1080                        results = new JSONArray();
1081                    }
1082                    return JsonToBean.createBulkResponseList(results);
1083                }
1084                else {
1085                    handleError(conn);
1086                }
1087                return null;
1088            }
1089        }
1090    
1091        private class CoordRerun extends ClientCallable<List<CoordinatorAction>> {
1092    
1093            CoordRerun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) {
1094                super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
1095                        RestConstants.JOB_COORD_ACTION_RERUN, RestConstants.JOB_COORD_RERUN_TYPE_PARAM, rerunType,
1096                        RestConstants.JOB_COORD_RERUN_SCOPE_PARAM, scope, RestConstants.JOB_COORD_RERUN_REFRESH_PARAM,
1097                        Boolean.toString(refresh), RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM, Boolean
1098                                .toString(noCleanup)));
1099            }
1100    
1101            @Override
1102            protected List<CoordinatorAction> call(HttpURLConnection conn) throws IOException, OozieClientException {
1103                conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1104                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1105                    Reader reader = new InputStreamReader(conn.getInputStream());
1106                    JSONObject json = (JSONObject) JSONValue.parse(reader);
1107                    JSONArray coordActions = (JSONArray) json.get(JsonTags.COORDINATOR_ACTIONS);
1108                    return JsonToBean.createCoordinatorActionList(coordActions);
1109                }
1110                else {
1111                    handleError(conn);
1112                }
1113                return null;
1114            }
1115        }
1116    
1117        private class BundleRerun extends ClientCallable<Void> {
1118    
1119            BundleRerun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) {
1120                super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
1121                        RestConstants.JOB_BUNDLE_ACTION_RERUN, RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM,
1122                        coordScope, RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM, dateScope,
1123                        RestConstants.JOB_COORD_RERUN_REFRESH_PARAM, Boolean.toString(refresh),
1124                        RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM, Boolean.toString(noCleanup)));
1125            }
1126    
1127            @Override
1128            protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
1129                conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1130                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1131                    return null;
1132                }
1133                else {
1134                    handleError(conn);
1135                }
1136                return null;
1137            }
1138        }
1139    
1140        /**
1141         * Rerun coordinator actions.
1142         *
1143         * @param jobId coordinator jobId
1144         * @param rerunType rerun type 'date' if -date is used, 'action-id' if -action is used
1145         * @param scope rerun scope for date or actionIds
1146         * @param refresh true if -refresh is given in command option
1147         * @param noCleanup true if -nocleanup is given in command option
1148         * @throws OozieClientException
1149         */
1150        public List<CoordinatorAction> reRunCoord(String jobId, String rerunType, String scope, boolean refresh,
1151                boolean noCleanup) throws OozieClientException {
1152            return new CoordRerun(jobId, rerunType, scope, refresh, noCleanup).call();
1153        }
1154    
1155        /**
1156         * Rerun bundle coordinators.
1157         *
1158         * @param jobId bundle jobId
1159         * @param coordScope rerun scope for coordinator jobs
1160         * @param dateScope rerun scope for date
1161         * @param refresh true if -refresh is given in command option
1162         * @param noCleanup true if -nocleanup is given in command option
1163         * @throws OozieClientException
1164         */
1165        public Void reRunBundle(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup)
1166                throws OozieClientException {
1167            return new BundleRerun(jobId, coordScope, dateScope, refresh, noCleanup).call();
1168        }
1169    
1170        /**
1171         * Return the info of the workflow jobs that match the filter.
1172         *
1173         * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
1174         * @param start jobs offset, base 1.
1175         * @param len number of jobs to return.
1176         * @return a list with the workflow jobs info, without node details.
1177         * @throws OozieClientException thrown if the jobs info could not be retrieved.
1178         */
1179        public List<WorkflowJob> getJobsInfo(String filter, int start, int len) throws OozieClientException {
1180            return new JobsStatus(filter, start, len).call();
1181        }
1182    
1183        /**
1184         * Return the info of the workflow jobs that match the filter.
1185         * <p/>
1186         * It returns the first 100 jobs that match the filter.
1187         *
1188         * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
1189         * @return a list with the workflow jobs info, without node details.
1190         * @throws OozieClientException thrown if the jobs info could not be retrieved.
1191         */
1192        public List<WorkflowJob> getJobsInfo(String filter) throws OozieClientException {
1193            return getJobsInfo(filter, 1, 50);
1194        }
1195    
1196        /**
1197         * Print sla info about coordinator and workflow jobs and actions.
1198         *
1199         * @param start starting offset
1200         * @param len number of results
1201         * @throws OozieClientException
1202         */
1203        public void getSlaInfo(int start, int len) throws OozieClientException {
1204            new SlaInfo(start, len).call();
1205        }
1206    
1207        private class SlaInfo extends ClientCallable<Void> {
1208    
1209            SlaInfo(int start, int len) {
1210                super("GET", RestConstants.SLA, "", prepareParams(RestConstants.SLA_GT_SEQUENCE_ID,
1211                        Integer.toString(start), RestConstants.MAX_EVENTS, Integer.toString(len)));
1212            }
1213    
1214            @Override
1215            @SuppressWarnings("unchecked")
1216            protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
1217                conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1218                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1219                    BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream()));
1220                    String line = null;
1221                    while ((line = br.readLine()) != null) {
1222                        System.out.println(line);
1223                    }
1224                }
1225                else {
1226                    handleError(conn);
1227                }
1228                return null;
1229            }
1230        }
1231    
1232        private class JobIdAction extends ClientCallable<String> {
1233    
1234            JobIdAction(String externalId) {
1235                super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, "wf",
1236                        RestConstants.JOBS_EXTERNAL_ID_PARAM, externalId));
1237            }
1238    
1239            @Override
1240            @SuppressWarnings("unchecked")
1241            protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
1242                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1243                    Reader reader = new InputStreamReader(conn.getInputStream());
1244                    JSONObject json = (JSONObject) JSONValue.parse(reader);
1245                    return (String) json.get(JsonTags.JOB_ID);
1246                }
1247                else {
1248                    handleError(conn);
1249                }
1250                return null;
1251            }
1252        }
1253    
1254        /**
1255         * Return the workflow job Id for an external Id.
1256         * <p/>
1257         * The external Id must have provided at job creation time.
1258         *
1259         * @param externalId external Id given at job creation time.
1260         * @return the workflow job Id for an external Id, <code>null</code> if none.
1261         * @throws OozieClientException thrown if the operation could not be done.
1262         */
1263        public String getJobId(String externalId) throws OozieClientException {
1264            return new JobIdAction(externalId).call();
1265        }
1266    
1267        private class SetSystemMode extends ClientCallable<Void> {
1268    
1269            public SetSystemMode(SYSTEM_MODE status) {
1270                super("PUT", RestConstants.ADMIN, RestConstants.ADMIN_STATUS_RESOURCE, prepareParams(
1271                        RestConstants.ADMIN_SYSTEM_MODE_PARAM, status + ""));
1272            }
1273    
1274            @Override
1275            public Void call(HttpURLConnection conn) throws IOException, OozieClientException {
1276                if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
1277                    handleError(conn);
1278                }
1279                return null;
1280            }
1281        }
1282    
1283        /**
1284         * Enable or disable safe mode. Used by OozieCLI. In safe mode, Oozie would not accept any commands except status
1285         * command to change and view the safe mode status.
1286         *
1287         * @param status true to enable safe mode, false to disable safe mode.
1288         * @throws OozieClientException if it fails to set the safe mode status.
1289         */
1290        public void setSystemMode(SYSTEM_MODE status) throws OozieClientException {
1291            new SetSystemMode(status).call();
1292        }
1293    
1294        private class GetSystemMode extends ClientCallable<SYSTEM_MODE> {
1295    
1296            GetSystemMode() {
1297                super("GET", RestConstants.ADMIN, RestConstants.ADMIN_STATUS_RESOURCE, prepareParams());
1298            }
1299    
1300            @Override
1301            protected SYSTEM_MODE call(HttpURLConnection conn) throws IOException, OozieClientException {
1302                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1303                    Reader reader = new InputStreamReader(conn.getInputStream());
1304                    JSONObject json = (JSONObject) JSONValue.parse(reader);
1305                    return SYSTEM_MODE.valueOf((String) json.get(JsonTags.OOZIE_SYSTEM_MODE));
1306                }
1307                else {
1308                    handleError(conn);
1309                }
1310                return SYSTEM_MODE.NORMAL;
1311            }
1312        }
1313    
1314        /**
1315         * Returns if Oozie is in safe mode or not.
1316         *
1317         * @return true if safe mode is ON<br>
1318         *         false if safe mode is OFF
1319         * @throws OozieClientException throw if it could not obtain the safe mode status.
1320         */
1321        /*
1322         * public boolean isInSafeMode() throws OozieClientException { return new GetSafeMode().call(); }
1323         */
1324        public SYSTEM_MODE getSystemMode() throws OozieClientException {
1325            return new GetSystemMode().call();
1326        }
1327    
1328        private class GetBuildVersion extends ClientCallable<String> {
1329    
1330            GetBuildVersion() {
1331                super("GET", RestConstants.ADMIN, RestConstants.ADMIN_BUILD_VERSION_RESOURCE, prepareParams());
1332            }
1333    
1334            @Override
1335            protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
1336                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1337                    Reader reader = new InputStreamReader(conn.getInputStream());
1338                    JSONObject json = (JSONObject) JSONValue.parse(reader);
1339                    return (String) json.get(JsonTags.BUILD_VERSION);
1340                }
1341                else {
1342                    handleError(conn);
1343                }
1344                return null;
1345            }
1346        }
1347    
1348        /**
1349         * Return the Oozie server build version.
1350         *
1351         * @return the Oozie server build version.
1352         * @throws OozieClientException throw if it the server build version could not be retrieved.
1353         */
1354        public String getServerBuildVersion() throws OozieClientException {
1355            return new GetBuildVersion().call();
1356        }
1357    
1358        /**
1359         * Return the Oozie client build version.
1360         *
1361         * @return the Oozie client build version.
1362         */
1363        public String getClientBuildVersion() {
1364            return BuildInfo.getBuildInfo().getProperty(BuildInfo.BUILD_VERSION);
1365        }
1366    
1367        /**
1368         * Return the info of the coordinator jobs that match the filter.
1369         *
1370         * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
1371         * @param start jobs offset, base 1.
1372         * @param len number of jobs to return.
1373         * @return a list with the coordinator jobs info
1374         * @throws OozieClientException thrown if the jobs info could not be retrieved.
1375         */
1376        public List<CoordinatorJob> getCoordJobsInfo(String filter, int start, int len) throws OozieClientException {
1377            return new CoordJobsStatus(filter, start, len).call();
1378        }
1379    
1380        /**
1381         * Return the info of the bundle jobs that match the filter.
1382         *
1383         * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
1384         * @param start jobs offset, base 1.
1385         * @param len number of jobs to return.
1386         * @return a list with the bundle jobs info
1387         * @throws OozieClientException thrown if the jobs info could not be retrieved.
1388         */
1389        public List<BundleJob> getBundleJobsInfo(String filter, int start, int len) throws OozieClientException {
1390            return new BundleJobsStatus(filter, start, len).call();
1391        }
1392    
1393        public List<BulkResponse> getBulkInfo(String filter, int start, int len) throws OozieClientException {
1394            return new BulkResponseStatus(filter, start, len).call();
1395        }
1396    
1397        private class GetQueueDump extends ClientCallable<List<String>> {
1398            GetQueueDump() {
1399                super("GET", RestConstants.ADMIN, RestConstants.ADMIN_QUEUE_DUMP_RESOURCE, prepareParams());
1400            }
1401    
1402            @Override
1403            protected List<String> call(HttpURLConnection conn) throws IOException, OozieClientException {
1404                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1405                    Reader reader = new InputStreamReader(conn.getInputStream());
1406                    JSONObject json = (JSONObject) JSONValue.parse(reader);
1407                    JSONArray queueDumpArray = (JSONArray) json.get(JsonTags.QUEUE_DUMP);
1408    
1409                    List<String> list = new ArrayList<String>();
1410                    list.add("[Server Queue Dump]:");
1411                    for (Object o : queueDumpArray) {
1412                        JSONObject entry = (JSONObject) o;
1413                        if (entry.get(JsonTags.CALLABLE_DUMP) != null) {
1414                            String value = (String) entry.get(JsonTags.CALLABLE_DUMP);
1415                            list.add(value);
1416                        }
1417                    }
1418                    if (queueDumpArray.size() == 0) {
1419                        list.add("Queue dump is null!");
1420                    }
1421    
1422                    list.add("******************************************");
1423                    list.add("[Server Uniqueness Map Dump]:");
1424    
1425                    JSONArray uniqueDumpArray = (JSONArray) json.get(JsonTags.UNIQUE_MAP_DUMP);
1426                    for (Object o : uniqueDumpArray) {
1427                        JSONObject entry = (JSONObject) o;
1428                        if (entry.get(JsonTags.UNIQUE_ENTRY_DUMP) != null) {
1429                            String value = (String) entry.get(JsonTags.UNIQUE_ENTRY_DUMP);
1430                            list.add(value);
1431                        }
1432                    }
1433                    if (uniqueDumpArray.size() == 0) {
1434                        list.add("Uniqueness dump is null!");
1435                    }
1436                    return list;
1437                }
1438                else {
1439                    handleError(conn);
1440                }
1441                return null;
1442            }
1443        }
1444    
1445        /**
1446         * Return the Oozie queue's commands' dump
1447         *
1448         * @return the list of strings of callable identification in queue
1449         * @throws OozieClientException throw if it the queue dump could not be retrieved.
1450         */
1451        public List<String> getQueueDump() throws OozieClientException {
1452            return new GetQueueDump().call();
1453        }
1454    
1455        /**
1456         * Check if the string is not null or not empty.
1457         *
1458         * @param str
1459         * @param name
1460         * @return string
1461         */
1462        public static String notEmpty(String str, String name) {
1463            if (str == null) {
1464                throw new IllegalArgumentException(name + " cannot be null");
1465            }
1466            if (str.length() == 0) {
1467                throw new IllegalArgumentException(name + " cannot be empty");
1468            }
1469            return str;
1470        }
1471    
1472        /**
1473         * Check if the object is not null.
1474         *
1475         * @param <T>
1476         * @param obj
1477         * @param name
1478         * @return string
1479         */
1480        public static <T> T notNull(T obj, String name) {
1481            if (obj == null) {
1482                throw new IllegalArgumentException(name + " cannot be null");
1483            }
1484            return obj;
1485        }
1486    
1487    }