001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.client;
019    
020    import java.io.BufferedReader;
021    import java.io.IOException;
022    import java.io.InputStream;
023    import java.io.InputStreamReader;
024    import java.io.OutputStream;
025    import java.io.PrintStream;
026    import java.io.Reader;
027    import java.net.HttpURLConnection;
028    import java.net.URL;
029    import java.net.URLEncoder;
030    import java.util.ArrayList;
031    import java.util.Collections;
032    import java.util.HashMap;
033    import java.util.Iterator;
034    import java.util.LinkedHashMap;
035    import java.util.List;
036    import java.util.Map;
037    import java.util.Properties;
038    import java.util.concurrent.Callable;
039    
040    import javax.xml.parsers.DocumentBuilderFactory;
041    import javax.xml.transform.Transformer;
042    import javax.xml.transform.TransformerFactory;
043    import javax.xml.transform.dom.DOMSource;
044    import javax.xml.transform.stream.StreamResult;
045    
046    import org.apache.oozie.BuildInfo;
047    import org.apache.oozie.client.rest.JsonTags;
048    import org.apache.oozie.client.rest.JsonToBean;
049    import org.apache.oozie.client.rest.RestConstants;
050    import org.json.simple.JSONArray;
051    import org.json.simple.JSONObject;
052    import org.json.simple.JSONValue;
053    import org.w3c.dom.Document;
054    import org.w3c.dom.Element;
055    
056    /**
057     * Client API to submit and manage Oozie workflow jobs against an Oozie intance.
058     * <p/>
059     * This class is thread safe.
060     * <p/>
061     * Syntax for filter for the {@link #getJobsInfo(String)} {@link #getJobsInfo(String, int, int)} methods:
062     * <code>[NAME=VALUE][;NAME=VALUE]*</code>.
063     * <p/>
064     * Valid filter names are:
065     * <p/>
066     * <ul/>
067     * <li>name: the workflow application name from the workflow definition.</li>
068     * <li>user: the user that submitted the job.</li>
069     * <li>group: the group for the job.</li>
070     * <li>status: the status of the job.</li>
071     * </ul>
072     * <p/>
073     * The query will do an AND among all the filter names. The query will do an OR among all the filter values for the same
074     * name. Multiple values must be specified as different name value pairs.
075     */
076    public class OozieClient {
077    
078        public static final long WS_PROTOCOL_VERSION_0 = 0;
079    
080        public static final long WS_PROTOCOL_VERSION_1 = 1;
081    
082        public static final long WS_PROTOCOL_VERSION = 2; // pointer to current version
083    
084        public static final String USER_NAME = "user.name";
085    
086        @Deprecated
087        public static final String GROUP_NAME = "group.name";
088    
089        public static final String JOB_ACL = "oozie.job.acl";
090    
091        public static final String APP_PATH = "oozie.wf.application.path";
092    
093        public static final String COORDINATOR_APP_PATH = "oozie.coord.application.path";
094    
095        public static final String BUNDLE_APP_PATH = "oozie.bundle.application.path";
096    
097        public static final String BUNDLE_ID = "oozie.bundle.id";
098    
099        public static final String EXTERNAL_ID = "oozie.wf.external.id";
100    
101        public static final String WORKFLOW_NOTIFICATION_URL = "oozie.wf.workflow.notification.url";
102    
103        public static final String ACTION_NOTIFICATION_URL = "oozie.wf.action.notification.url";
104    
105        public static final String COORD_ACTION_NOTIFICATION_URL = "oozie.coord.action.notification.url";
106    
107        public static final String RERUN_SKIP_NODES = "oozie.wf.rerun.skip.nodes";
108    
109        public static final String RERUN_FAIL_NODES = "oozie.wf.rerun.failnodes";
110    
111        public static final String LOG_TOKEN = "oozie.wf.log.token";
112    
113        public static final String ACTION_MAX_RETRIES = "oozie.wf.action.max.retries";
114    
115        public static final String ACTION_RETRY_INTERVAL = "oozie.wf.action.retry.interval";
116    
117        public static final String FILTER_USER = "user";
118    
119        public static final String FILTER_GROUP = "group";
120    
121        public static final String FILTER_NAME = "name";
122    
123        public static final String FILTER_STATUS = "status";
124    
125        public static final String FILTER_FREQUENCY = "frequency";
126    
127        public static final String FILTER_ID = "id";
128    
129        public static final String FILTER_UNIT = "unit";
130    
131        public static final String FILTER_JOBID = "jobid";
132    
133        public static final String FILTER_APPNAME = "appname";
134    
135        public static final String FILTER_SLA_APPNAME = "app_name";
136    
137        public static final String FILTER_SLA_ID = "id";
138    
139        public static final String FILTER_SLA_PARENT_ID = "parent_id";
140    
141        public static final String FILTER_SLA_NOMINAL_START = "nominal_start";
142    
143        public static final String FILTER_SLA_NOMINAL_END = "nominal_end";
144    
145        public static final String CHANGE_VALUE_ENDTIME = "endtime";
146    
147        public static final String CHANGE_VALUE_PAUSETIME = "pausetime";
148    
149        public static final String CHANGE_VALUE_CONCURRENCY = "concurrency";
150    
151        public static final String LIBPATH = "oozie.libpath";
152    
153        public static final String USE_SYSTEM_LIBPATH = "oozie.use.system.libpath";
154    
155        public static final String OOZIE_SUSPEND_ON_NODES = "oozie.suspend.on.nodes";
156    
157        public static enum SYSTEM_MODE {
158            NORMAL, NOWEBSERVICE, SAFEMODE
159        };
160    
161        /**
162         * debugMode =0 means no debugging. > 0 means debugging on.
163         */
164        public int debugMode = 0;
165    
166        private String baseUrl;
167        private String protocolUrl;
168        private boolean validatedVersion = false;
169        private JSONArray supportedVersions;
170        private final Map<String, String> headers = new HashMap<String, String>();
171    
172        private static ThreadLocal<String> USER_NAME_TL = new ThreadLocal<String>();
173    
174        /**
175         * Allows to impersonate other users in the Oozie server. The current user
176         * must be configured as a proxyuser in Oozie.
177         * <p/>
178         * IMPORTANT: impersonation happens only with Oozie client requests done within
179         * doAs() calls.
180         *
181         * @param userName user to impersonate.
182         * @param callable callable with {@link OozieClient} calls impersonating the specified user.
183         * @return any response returned by the {@link Callable#call()} method.
184         * @throws Exception thrown by the {@link Callable#call()} method.
185         */
186        public static <T> T doAs(String userName, Callable<T> callable) throws Exception {
187            notEmpty(userName, "userName");
188            notNull(callable, "callable");
189            try {
190                USER_NAME_TL.set(userName);
191                return callable.call();
192            }
193            finally {
194                USER_NAME_TL.remove();
195            }
196        }
197    
198        protected OozieClient() {
199        }
200    
201        /**
202         * Create a Workflow client instance.
203         *
204         * @param oozieUrl URL of the Oozie instance it will interact with.
205         */
206        public OozieClient(String oozieUrl) {
207            this.baseUrl = notEmpty(oozieUrl, "oozieUrl");
208            if (!this.baseUrl.endsWith("/")) {
209                this.baseUrl += "/";
210            }
211        }
212    
213        /**
214         * Return the Oozie URL of the workflow client instance.
215         * <p/>
216         * This URL is the base URL fo the Oozie system, with not protocol versioning.
217         *
218         * @return the Oozie URL of the workflow client instance.
219         */
220        public String getOozieUrl() {
221            return baseUrl;
222        }
223    
224        /**
225         * Return the Oozie URL used by the client and server for WS communications.
226         * <p/>
227         * This URL is the original URL plus the versioning element path.
228         *
229         * @return the Oozie URL used by the client and server for communication.
230         * @throws OozieClientException thrown in the client and the server are not protocol compatible.
231         */
232        public String getProtocolUrl() throws OozieClientException {
233            validateWSVersion();
234            return protocolUrl;
235        }
236    
237        /**
238         * @return current debug Mode
239         */
240        public int getDebugMode() {
241            return debugMode;
242        }
243    
244        /**
245         * Set debug mode.
246         *
247         * @param debugMode : 0 means no debugging. > 0 means debugging
248         */
249        public void setDebugMode(int debugMode) {
250            this.debugMode = debugMode;
251        }
252    
253        private String getBaseURLForVersion(long protocolVersion) throws OozieClientException {
254            try {
255                if (supportedVersions == null) {
256                    supportedVersions = getSupportedProtocolVersions();
257                }
258                if (supportedVersions == null) {
259                    throw new OozieClientException("HTTP error", "no response message");
260                }
261                if (supportedVersions.contains(protocolVersion)) {
262                    return baseUrl + "v" + protocolVersion + "/";
263                }
264                else {
265                    throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, "Protocol version "
266                            + protocolVersion + " is not supported");
267                }
268            }
269            catch (IOException e) {
270                throw new OozieClientException(OozieClientException.IO_ERROR, e);
271            }
272        }
273    
274        /**
275         * Validate that the Oozie client and server instances are protocol compatible.
276         *
277         * @throws OozieClientException thrown in the client and the server are not protocol compatible.
278         */
279        public synchronized void validateWSVersion() throws OozieClientException {
280            if (!validatedVersion) {
281                try {
282                    supportedVersions = getSupportedProtocolVersions();
283                    if (supportedVersions == null) {
284                        throw new OozieClientException("HTTP error", "no response message");
285                    }
286                    if (!supportedVersions.contains(WS_PROTOCOL_VERSION)
287                            && !supportedVersions.contains(WS_PROTOCOL_VERSION_1)
288                            && !supportedVersions.contains(WS_PROTOCOL_VERSION_0)) {
289                        StringBuilder msg = new StringBuilder();
290                        msg.append("Supported version [").append(WS_PROTOCOL_VERSION)
291                                .append("] or less, Unsupported versions[");
292                        String separator = "";
293                        for (Object version : supportedVersions) {
294                            msg.append(separator).append(version);
295                        }
296                        msg.append("]");
297                        throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, msg.toString());
298                    }
299                    if (supportedVersions.contains(WS_PROTOCOL_VERSION)) {
300                        protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION + "/";
301                    }
302                    else if (supportedVersions.contains(WS_PROTOCOL_VERSION_1)) {
303                        protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION_1 + "/";
304                    }
305                    else {
306                        if (supportedVersions.contains(WS_PROTOCOL_VERSION_0)) {
307                            protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION_0 + "/";
308                        }
309                    }
310                }
311                catch (IOException ex) {
312                    throw new OozieClientException(OozieClientException.IO_ERROR, ex);
313                }
314                validatedVersion = true;
315            }
316        }
317    
318        private JSONArray getSupportedProtocolVersions() throws IOException, OozieClientException {
319            JSONArray versions = null;
320            URL url = new URL(baseUrl + RestConstants.VERSIONS);
321            HttpURLConnection conn = createConnection(url, "GET");
322            if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) {
323                versions = (JSONArray) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
324            }
325            else {
326                handleError(conn);
327            }
328            return versions;
329        }
330    
331        /**
332         * Create an empty configuration with just the {@link #USER_NAME} set to the JVM user name.
333         *
334         * @return an empty configuration.
335         */
336        public Properties createConfiguration() {
337            Properties conf = new Properties();
338            String userName = USER_NAME_TL.get();
339            if (userName == null) {
340                userName = System.getProperty("user.name");
341            }
342            conf.setProperty(USER_NAME, userName);
343            return conf;
344        }
345    
346        /**
347         * Set a HTTP header to be used in the WS requests by the workflow instance.
348         *
349         * @param name header name.
350         * @param value header value.
351         */
352        public void setHeader(String name, String value) {
353            headers.put(notEmpty(name, "name"), notNull(value, "value"));
354        }
355    
356        /**
357         * Get the value of a set HTTP header from the workflow instance.
358         *
359         * @param name header name.
360         * @return header value, <code>null</code> if not set.
361         */
362        public String getHeader(String name) {
363            return headers.get(notEmpty(name, "name"));
364        }
365    
366        /**
367         * Get the set HTTP header
368         *
369         * @return map of header key and value
370         */
371        public Map<String, String> getHeaders() {
372            return headers;
373        }
374    
375        /**
376         * Remove a HTTP header from the workflow client instance.
377         *
378         * @param name header name.
379         */
380        public void removeHeader(String name) {
381            headers.remove(notEmpty(name, "name"));
382        }
383    
384        /**
385         * Return an iterator with all the header names set in the workflow instance.
386         *
387         * @return header names.
388         */
389        public Iterator<String> getHeaderNames() {
390            return Collections.unmodifiableMap(headers).keySet().iterator();
391        }
392    
393        private URL createURL(Long protocolVersion, String collection, String resource, Map<String, String> parameters)
394                throws IOException, OozieClientException {
395            validateWSVersion();
396            StringBuilder sb = new StringBuilder();
397            if (protocolVersion == null) {
398                sb.append(protocolUrl);
399            }
400            else {
401                sb.append(getBaseURLForVersion(protocolVersion));
402            }
403            sb.append(collection);
404            if (resource != null && resource.length() > 0) {
405                sb.append("/").append(resource);
406            }
407            if (parameters.size() > 0) {
408                String separator = "?";
409                for (Map.Entry<String, String> param : parameters.entrySet()) {
410                    if (param.getValue() != null) {
411                        sb.append(separator).append(URLEncoder.encode(param.getKey(), "UTF-8")).append("=").append(
412                                URLEncoder.encode(param.getValue(), "UTF-8"));
413                        separator = "&";
414                    }
415                }
416            }
417            return new URL(sb.toString());
418        }
419    
420        private boolean validateCommand(String url) {
421            {
422                if (protocolUrl.contains(baseUrl + "v0")) {
423                    if (url.contains("dryrun") || url.contains("jobtype=c") || url.contains("systemmode")) {
424                        return false;
425                    }
426                }
427            }
428            return true;
429        }
430    
431        /**
432         * Create http connection to oozie server.
433         *
434         * @param url
435         * @param method
436         * @return connection
437         * @throws IOException
438         * @throws OozieClientException
439         */
440        protected HttpURLConnection createConnection(URL url, String method) throws IOException, OozieClientException {
441            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
442            conn.setRequestMethod(method);
443            if (method.equals("POST") || method.equals("PUT")) {
444                conn.setDoOutput(true);
445            }
446            for (Map.Entry<String, String> header : headers.entrySet()) {
447                conn.setRequestProperty(header.getKey(), header.getValue());
448            }
449            return conn;
450        }
451    
452        protected abstract class ClientCallable<T> implements Callable<T> {
453            private final String method;
454            private final String collection;
455            private final String resource;
456            private final Map<String, String> params;
457            private final Long protocolVersion;
458    
459            public ClientCallable(String method, String collection, String resource, Map<String, String> params) {
460                this(method, null, collection, resource, params);
461            }
462    
463            public ClientCallable(String method, Long protocolVersion, String collection, String resource, Map<String, String> params) {
464                this.method = method;
465                this.protocolVersion = protocolVersion;
466                this.collection = collection;
467                this.resource = resource;
468                this.params = params;
469            }
470    
471            public T call() throws OozieClientException {
472                try {
473                    URL url = createURL(protocolVersion, collection, resource, params);
474                    if (validateCommand(url.toString())) {
475                        if (getDebugMode() > 0) {
476                            System.out.println(method + " " + url);
477                        }
478                        HttpURLConnection conn = createConnection(url, method);
479                        return call(conn);
480                    }
481                    else {
482                        System.out.println("Option not supported in target server. Supported only on Oozie-2.0 or greater."
483                                + " Use 'oozie help' for details");
484                        throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, new Exception());
485                    }
486                }
487                catch (IOException ex) {
488                    throw new OozieClientException(OozieClientException.IO_ERROR, ex);
489                }
490    
491            }
492    
493            protected abstract T call(HttpURLConnection conn) throws IOException, OozieClientException;
494        }
495    
496        static void handleError(HttpURLConnection conn) throws IOException, OozieClientException {
497            int status = conn.getResponseCode();
498            String error = conn.getHeaderField(RestConstants.OOZIE_ERROR_CODE);
499            String message = conn.getHeaderField(RestConstants.OOZIE_ERROR_MESSAGE);
500    
501            if (error == null) {
502                error = "HTTP error code: " + status;
503            }
504    
505            if (message == null) {
506                message = conn.getResponseMessage();
507            }
508            throw new OozieClientException(error, message);
509        }
510    
511        static Map<String, String> prepareParams(String... params) {
512            Map<String, String> map = new LinkedHashMap<String, String>();
513            for (int i = 0; i < params.length; i = i + 2) {
514                map.put(params[i], params[i + 1]);
515            }
516            String doAsUserName = USER_NAME_TL.get();
517            if (doAsUserName != null) {
518                map.put(RestConstants.DO_AS_PARAM, doAsUserName);
519            }
520            return map;
521        }
522    
523        public void writeToXml(Properties props, OutputStream out) throws IOException {
524            try {
525                Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument();
526                Element conf = doc.createElement("configuration");
527                doc.appendChild(conf);
528                conf.appendChild(doc.createTextNode("\n"));
529                for (String name : props.stringPropertyNames()) { // Properties whose key or value is not of type String are omitted.
530                    String value = props.getProperty(name);
531                    Element propNode = doc.createElement("property");
532                    conf.appendChild(propNode);
533    
534                    Element nameNode = doc.createElement("name");
535                    nameNode.appendChild(doc.createTextNode(name.trim()));
536                    propNode.appendChild(nameNode);
537    
538                    Element valueNode = doc.createElement("value");
539                    valueNode.appendChild(doc.createTextNode(value.trim()));
540                    propNode.appendChild(valueNode);
541    
542                    conf.appendChild(doc.createTextNode("\n"));
543                }
544    
545                DOMSource source = new DOMSource(doc);
546                StreamResult result = new StreamResult(out);
547                TransformerFactory transFactory = TransformerFactory.newInstance();
548                Transformer transformer = transFactory.newTransformer();
549                transformer.transform(source, result);
550                if (getDebugMode() > 0) {
551                    result = new StreamResult(System.out);
552                    transformer.transform(source, result);
553                    System.out.println();
554                }
555            }
556            catch (Exception e) {
557                throw new IOException(e);
558            }
559        }
560    
561        private class JobSubmit extends ClientCallable<String> {
562            private final Properties conf;
563    
564            JobSubmit(Properties conf, boolean start) {
565                super("POST", RestConstants.JOBS, "", (start) ? prepareParams(RestConstants.ACTION_PARAM,
566                        RestConstants.JOB_ACTION_START) : prepareParams());
567                this.conf = notNull(conf, "conf");
568            }
569    
570            JobSubmit(String jobId, Properties conf) {
571                super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
572                        RestConstants.JOB_ACTION_RERUN));
573                this.conf = notNull(conf, "conf");
574            }
575    
576            public JobSubmit(Properties conf, String jobActionDryrun) {
577                super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.ACTION_PARAM,
578                        RestConstants.JOB_ACTION_DRYRUN));
579                this.conf = notNull(conf, "conf");
580            }
581    
582            @Override
583            protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
584                conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
585                writeToXml(conf, conn.getOutputStream());
586                if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) {
587                    JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
588                    return (String) json.get(JsonTags.JOB_ID);
589                }
590                if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
591                    handleError(conn);
592                }
593                return null;
594            }
595        }
596    
597        /**
598         * Submit a workflow job.
599         *
600         * @param conf job configuration.
601         * @return the job Id.
602         * @throws OozieClientException thrown if the job could not be submitted.
603         */
604        public String submit(Properties conf) throws OozieClientException {
605            return (new JobSubmit(conf, false)).call();
606        }
607    
608        private class JobAction extends ClientCallable<Void> {
609    
610            JobAction(String jobId, String action) {
611                super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, action));
612            }
613    
614            JobAction(String jobId, String action, String params) {
615                super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, action,
616                        RestConstants.JOB_CHANGE_VALUE, params));
617            }
618    
619            @Override
620            protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
621                if (!(conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
622                    handleError(conn);
623                }
624                return null;
625            }
626        }
627    
628        /**
629         * dryrun for a given job
630         *
631         * @param conf Job configuration.
632         */
633        public String dryrun(Properties conf) throws OozieClientException {
634            return new JobSubmit(conf, RestConstants.JOB_ACTION_DRYRUN).call();
635        }
636    
637        /**
638         * Start a workflow job.
639         *
640         * @param jobId job Id.
641         * @throws OozieClientException thrown if the job could not be started.
642         */
643        public void start(String jobId) throws OozieClientException {
644            new JobAction(jobId, RestConstants.JOB_ACTION_START).call();
645        }
646    
647        /**
648         * Submit and start a workflow job.
649         *
650         * @param conf job configuration.
651         * @return the job Id.
652         * @throws OozieClientException thrown if the job could not be submitted.
653         */
654        public String run(Properties conf) throws OozieClientException {
655            return (new JobSubmit(conf, true)).call();
656        }
657    
658        /**
659         * Rerun a workflow job.
660         *
661         * @param jobId job Id to rerun.
662         * @param conf configuration information for the rerun.
663         * @throws OozieClientException thrown if the job could not be started.
664         */
665        public void reRun(String jobId, Properties conf) throws OozieClientException {
666            new JobSubmit(jobId, conf).call();
667        }
668    
669        /**
670         * Suspend a workflow job.
671         *
672         * @param jobId job Id.
673         * @throws OozieClientException thrown if the job could not be suspended.
674         */
675        public void suspend(String jobId) throws OozieClientException {
676            new JobAction(jobId, RestConstants.JOB_ACTION_SUSPEND).call();
677        }
678    
679        /**
680         * Resume a workflow job.
681         *
682         * @param jobId job Id.
683         * @throws OozieClientException thrown if the job could not be resume.
684         */
685        public void resume(String jobId) throws OozieClientException {
686            new JobAction(jobId, RestConstants.JOB_ACTION_RESUME).call();
687        }
688    
689        /**
690         * Kill a workflow job.
691         *
692         * @param jobId job Id.
693         * @throws OozieClientException thrown if the job could not be killed.
694         */
695        public void kill(String jobId) throws OozieClientException {
696            new JobAction(jobId, RestConstants.JOB_ACTION_KILL).call();
697        }
698    
699        /**
700         * Change a coordinator job.
701         *
702         * @param jobId job Id.
703         * @param changeValue change value.
704         * @throws OozieClientException thrown if the job could not be changed.
705         */
706        public void change(String jobId, String changeValue) throws OozieClientException {
707            new JobAction(jobId, RestConstants.JOB_ACTION_CHANGE, changeValue).call();
708        }
709    
710        private class JobInfo extends ClientCallable<WorkflowJob> {
711    
712            JobInfo(String jobId, int start, int len) {
713                super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
714                        RestConstants.JOB_SHOW_INFO, RestConstants.OFFSET_PARAM, Integer.toString(start),
715                        RestConstants.LEN_PARAM, Integer.toString(len)));
716            }
717    
718            @Override
719            protected WorkflowJob call(HttpURLConnection conn) throws IOException, OozieClientException {
720                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
721                    Reader reader = new InputStreamReader(conn.getInputStream());
722                    JSONObject json = (JSONObject) JSONValue.parse(reader);
723                    return JsonToBean.createWorkflowJob(json);
724                }
725                else {
726                    handleError(conn);
727                }
728                return null;
729            }
730        }
731    
732    
733        private class JMSInfo extends ClientCallable<JMSConnectionInfo> {
734    
735            JMSInfo() {
736                super("GET", RestConstants.ADMIN, RestConstants.ADMIN_JMS_INFO, prepareParams());
737            }
738    
739            protected JMSConnectionInfo call(HttpURLConnection conn) throws IOException, OozieClientException {
740                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
741                    Reader reader = new InputStreamReader(conn.getInputStream());
742                    JSONObject json = (JSONObject) JSONValue.parse(reader);
743                    return JsonToBean.createJMSConnectionInfo(json);
744                }
745                else {
746                    handleError(conn);
747                }
748                return null;
749            }
750        }
751    
752        private class WorkflowActionInfo extends ClientCallable<WorkflowAction> {
753            WorkflowActionInfo(String actionId) {
754                super("GET", RestConstants.JOB, notEmpty(actionId, "id"), prepareParams(RestConstants.JOB_SHOW_PARAM,
755                        RestConstants.JOB_SHOW_INFO));
756            }
757    
758            @Override
759            protected WorkflowAction call(HttpURLConnection conn) throws IOException, OozieClientException {
760                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
761                    Reader reader = new InputStreamReader(conn.getInputStream());
762                    JSONObject json = (JSONObject) JSONValue.parse(reader);
763                    return JsonToBean.createWorkflowAction(json);
764                }
765                else {
766                    handleError(conn);
767                }
768                return null;
769            }
770        }
771    
772        /**
773         * Get the info of a workflow job.
774         *
775         * @param jobId job Id.
776         * @return the job info.
777         * @throws OozieClientException thrown if the job info could not be retrieved.
778         */
779        public WorkflowJob getJobInfo(String jobId) throws OozieClientException {
780            return getJobInfo(jobId, 0, 0);
781        }
782    
783        /**
784         * Get the JMS Connection info
785         * @return JMSConnectionInfo object
786         * @throws OozieClientException
787         */
788        public JMSConnectionInfo getJMSConnectionInfo() throws OozieClientException {
789            return new JMSInfo().call();
790        }
791    
792        /**
793         * Get the info of a workflow job and subset actions.
794         *
795         * @param jobId job Id.
796         * @param start starting index in the list of actions belonging to the job
797         * @param len number of actions to be returned
798         * @return the job info.
799         * @throws OozieClientException thrown if the job info could not be retrieved.
800         */
801        public WorkflowJob getJobInfo(String jobId, int start, int len) throws OozieClientException {
802            return new JobInfo(jobId, start, len).call();
803        }
804    
805        /**
806         * Get the info of a workflow action.
807         *
808         * @param actionId Id.
809         * @return the workflow action info.
810         * @throws OozieClientException thrown if the job info could not be retrieved.
811         */
812        public WorkflowAction getWorkflowActionInfo(String actionId) throws OozieClientException {
813            return new WorkflowActionInfo(actionId).call();
814        }
815    
816        /**
817         * Get the log of a workflow job.
818         *
819         * @param jobId job Id.
820         * @return the job log.
821         * @throws OozieClientException thrown if the job info could not be retrieved.
822         */
823        public String getJobLog(String jobId) throws OozieClientException {
824            return new JobLog(jobId).call();
825        }
826    
827        /**
828         * Get the log of a job.
829         *
830         * @param jobId job Id.
831         * @param logRetrievalType Based on which filter criteria the log is retrieved
832         * @param logRetrievalScope Value for the retrieval type
833         * @param ps Printstream of command line interface
834         * @throws OozieClientException thrown if the job info could not be retrieved.
835         */
836        public void getJobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps)
837                throws OozieClientException {
838            new JobLog(jobId, logRetrievalType, logRetrievalScope, ps).call();
839        }
840    
841        private class JobLog extends JobMetadata {
842            JobLog(String jobId) {
843                super(jobId, RestConstants.JOB_SHOW_LOG);
844            }
845    
846            JobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps) {
847                super(jobId, logRetrievalType, logRetrievalScope, RestConstants.JOB_SHOW_LOG, ps);
848            }
849        }
850    
851        /**
852         * Gets the JMS topic name for a particular job
853         * @param jobId given jobId
854         * @return the JMS topic name
855         * @throws OozieClientException
856         */
857        public String getJMSTopicName(String jobId) throws OozieClientException {
858            return new JMSTopic(jobId).call();
859        }
860    
861        private class JMSTopic extends ClientCallable<String> {
862    
863            JMSTopic(String jobId) {
864                super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
865                        RestConstants.JOB_SHOW_JMS_TOPIC));
866            }
867    
868            protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
869                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
870                    Reader reader = new InputStreamReader(conn.getInputStream());
871                    JSONObject json = (JSONObject) JSONValue.parse(reader);
872                    return (String) json.get(JsonTags.JMS_TOPIC_NAME);
873                }
874                else {
875                    handleError(conn);
876                }
877                return null;
878            }
879        }
880    
881        /**
882         * Get the definition of a workflow job.
883         *
884         * @param jobId job Id.
885         * @return the job log.
886         * @throws OozieClientException thrown if the job info could not be retrieved.
887         */
888        public String getJobDefinition(String jobId) throws OozieClientException {
889            return new JobDefinition(jobId).call();
890        }
891    
892        private class JobDefinition extends JobMetadata {
893    
894            JobDefinition(String jobId) {
895                super(jobId, RestConstants.JOB_SHOW_DEFINITION);
896            }
897        }
898    
899        private class JobMetadata extends ClientCallable<String> {
900            PrintStream printStream;
901    
902            JobMetadata(String jobId, String metaType) {
903                super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
904                        metaType));
905            }
906    
907            JobMetadata(String jobId, String logRetrievalType, String logRetrievalScope, String metaType, PrintStream ps) {
908                super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
909                        metaType, RestConstants.JOB_LOG_TYPE_PARAM, logRetrievalType, RestConstants.JOB_LOG_SCOPE_PARAM,
910                        logRetrievalScope));
911                printStream = ps;
912            }
913    
914            @Override
915            protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
916                String returnVal = null;
917                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
918                    InputStream is = conn.getInputStream();
919                    InputStreamReader isr = new InputStreamReader(is);
920                    try {
921                        if (printStream != null) {
922                            sendToOutputStream(isr, -1);
923                        }
924                        else {
925                            returnVal = getReaderAsString(isr, -1);
926                        }
927                    }
928                    finally {
929                        isr.close();
930                    }
931                }
932                else {
933                    handleError(conn);
934                }
935                return returnVal;
936            }
937    
938            /**
939             * Output the log to command line interface
940             *
941             * @param reader reader to read into a string.
942             * @param maxLen max content length allowed, if -1 there is no limit.
943             * @param ps Printstream of command line interface
944             * @throws IOException
945             */
946            private void sendToOutputStream(Reader reader, int maxLen) throws IOException {
947                if (reader == null) {
948                    throw new IllegalArgumentException("reader cannot be null");
949                }
950                StringBuilder sb = new StringBuilder();
951                char[] buffer = new char[2048];
952                int read;
953                int count = 0;
954                int noOfCharstoFlush = 1024;
955                while ((read = reader.read(buffer)) > -1) {
956                    count += read;
957                    if ((maxLen > -1) && (count > maxLen)) {
958                        break;
959                    }
960                    sb.append(buffer, 0, read);
961                    if (sb.length() > noOfCharstoFlush) {
962                        printStream.print(sb.toString());
963                        sb = new StringBuilder("");
964                    }
965                }
966                printStream.print(sb.toString());
967            }
968    
969            /**
970             * Return a reader as string.
971             * <p/>
972             *
973             * @param reader reader to read into a string.
974             * @param maxLen max content length allowed, if -1 there is no limit.
975             * @return the reader content.
976             * @throws IOException thrown if the resource could not be read.
977             */
978            private String getReaderAsString(Reader reader, int maxLen) throws IOException {
979                if (reader == null) {
980                    throw new IllegalArgumentException("reader cannot be null");
981                }
982                StringBuffer sb = new StringBuffer();
983                char[] buffer = new char[2048];
984                int read;
985                int count = 0;
986                while ((read = reader.read(buffer)) > -1) {
987                    count += read;
988    
989                    // read up to maxLen chars;
990                    if ((maxLen > -1) && (count > maxLen)) {
991                        break;
992                    }
993                    sb.append(buffer, 0, read);
994                }
995                reader.close();
996                return sb.toString();
997            }
998        }
999    
1000        private class CoordJobInfo extends ClientCallable<CoordinatorJob> {
1001    
1002            CoordJobInfo(String jobId, String filter, int start, int len) {
1003                super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
1004                        RestConstants.JOB_SHOW_INFO, RestConstants.JOB_FILTER_PARAM, filter, RestConstants.OFFSET_PARAM, Integer.toString(start),
1005                        RestConstants.LEN_PARAM, Integer.toString(len)));
1006            }
1007    
1008            @Override
1009            protected CoordinatorJob call(HttpURLConnection conn) throws IOException, OozieClientException {
1010                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1011                    Reader reader = new InputStreamReader(conn.getInputStream());
1012                    JSONObject json = (JSONObject) JSONValue.parse(reader);
1013                    return JsonToBean.createCoordinatorJob(json);
1014                }
1015                else {
1016                    handleError(conn);
1017                }
1018                return null;
1019            }
1020        }
1021    
1022        private class BundleJobInfo extends ClientCallable<BundleJob> {
1023    
1024            BundleJobInfo(String jobId) {
1025                super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
1026                        RestConstants.JOB_SHOW_INFO));
1027            }
1028    
1029            @Override
1030            protected BundleJob call(HttpURLConnection conn) throws IOException, OozieClientException {
1031                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1032                    Reader reader = new InputStreamReader(conn.getInputStream());
1033                    JSONObject json = (JSONObject) JSONValue.parse(reader);
1034                    return JsonToBean.createBundleJob(json);
1035                }
1036                else {
1037                    handleError(conn);
1038                }
1039                return null;
1040            }
1041        }
1042    
1043        private class CoordActionInfo extends ClientCallable<CoordinatorAction> {
1044            CoordActionInfo(String actionId) {
1045                super("GET", RestConstants.JOB, notEmpty(actionId, "id"), prepareParams(RestConstants.JOB_SHOW_PARAM,
1046                        RestConstants.JOB_SHOW_INFO));
1047            }
1048    
1049            @Override
1050            protected CoordinatorAction call(HttpURLConnection conn) throws IOException, OozieClientException {
1051                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1052                    Reader reader = new InputStreamReader(conn.getInputStream());
1053                    JSONObject json = (JSONObject) JSONValue.parse(reader);
1054                    return JsonToBean.createCoordinatorAction(json);
1055                }
1056                else {
1057                    handleError(conn);
1058                }
1059                return null;
1060            }
1061        }
1062    
1063        /**
1064         * Get the info of a bundle job.
1065         *
1066         * @param jobId job Id.
1067         * @return the job info.
1068         * @throws OozieClientException thrown if the job info could not be retrieved.
1069         */
1070        public BundleJob getBundleJobInfo(String jobId) throws OozieClientException {
1071            return new BundleJobInfo(jobId).call();
1072        }
1073    
1074        /**
1075         * Get the info of a coordinator job.
1076         *
1077         * @param jobId job Id.
1078         * @return the job info.
1079         * @throws OozieClientException thrown if the job info could not be retrieved.
1080         */
1081        public CoordinatorJob getCoordJobInfo(String jobId) throws OozieClientException {
1082            return new CoordJobInfo(jobId, null, -1, -1).call();
1083        }
1084    
1085        /**
1086         * Get the info of a coordinator job and subset actions.
1087         *
1088         * @param jobId job Id.
1089         * @param filter filter the status filter
1090         * @param start starting index in the list of actions belonging to the job
1091         * @param len number of actions to be returned
1092         * @return the job info.
1093         * @throws OozieClientException thrown if the job info could not be retrieved.
1094         */
1095        public CoordinatorJob getCoordJobInfo(String jobId, String filter, int start, int len) throws OozieClientException {
1096            return new CoordJobInfo(jobId, filter, start, len).call();
1097        }
1098    
1099        /**
1100         * Get the info of a coordinator action.
1101         *
1102         * @param actionId Id.
1103         * @return the coordinator action info.
1104         * @throws OozieClientException thrown if the job info could not be retrieved.
1105         */
1106        public CoordinatorAction getCoordActionInfo(String actionId) throws OozieClientException {
1107            return new CoordActionInfo(actionId).call();
1108        }
1109    
1110        private class JobsStatus extends ClientCallable<List<WorkflowJob>> {
1111    
1112            JobsStatus(String filter, int start, int len) {
1113                super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter,
1114                        RestConstants.JOBTYPE_PARAM, "wf", RestConstants.OFFSET_PARAM, Integer.toString(start),
1115                        RestConstants.LEN_PARAM, Integer.toString(len)));
1116            }
1117    
1118            @Override
1119            @SuppressWarnings("unchecked")
1120            protected List<WorkflowJob> call(HttpURLConnection conn) throws IOException, OozieClientException {
1121                conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1122                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1123                    Reader reader = new InputStreamReader(conn.getInputStream());
1124                    JSONObject json = (JSONObject) JSONValue.parse(reader);
1125                    JSONArray workflows = (JSONArray) json.get(JsonTags.WORKFLOWS_JOBS);
1126                    if (workflows == null) {
1127                        workflows = new JSONArray();
1128                    }
1129                    return JsonToBean.createWorkflowJobList(workflows);
1130                }
1131                else {
1132                    handleError(conn);
1133                }
1134                return null;
1135            }
1136        }
1137    
1138        private class CoordJobsStatus extends ClientCallable<List<CoordinatorJob>> {
1139    
1140            CoordJobsStatus(String filter, int start, int len) {
1141                super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter,
1142                        RestConstants.JOBTYPE_PARAM, "coord", RestConstants.OFFSET_PARAM, Integer.toString(start),
1143                        RestConstants.LEN_PARAM, Integer.toString(len)));
1144            }
1145    
1146            @Override
1147            protected List<CoordinatorJob> call(HttpURLConnection conn) throws IOException, OozieClientException {
1148                conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1149                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1150                    Reader reader = new InputStreamReader(conn.getInputStream());
1151                    JSONObject json = (JSONObject) JSONValue.parse(reader);
1152                    JSONArray jobs = (JSONArray) json.get(JsonTags.COORDINATOR_JOBS);
1153                    if (jobs == null) {
1154                        jobs = new JSONArray();
1155                    }
1156                    return JsonToBean.createCoordinatorJobList(jobs);
1157                }
1158                else {
1159                    handleError(conn);
1160                }
1161                return null;
1162            }
1163        }
1164    
1165        private class BundleJobsStatus extends ClientCallable<List<BundleJob>> {
1166    
1167            BundleJobsStatus(String filter, int start, int len) {
1168                super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter,
1169                        RestConstants.JOBTYPE_PARAM, "bundle", RestConstants.OFFSET_PARAM, Integer.toString(start),
1170                        RestConstants.LEN_PARAM, Integer.toString(len)));
1171            }
1172    
1173            @Override
1174            protected List<BundleJob> call(HttpURLConnection conn) throws IOException, OozieClientException {
1175                conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1176                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1177                    Reader reader = new InputStreamReader(conn.getInputStream());
1178                    JSONObject json = (JSONObject) JSONValue.parse(reader);
1179                    JSONArray jobs = (JSONArray) json.get(JsonTags.BUNDLE_JOBS);
1180                    if (jobs == null) {
1181                        jobs = new JSONArray();
1182                    }
1183                    return JsonToBean.createBundleJobList(jobs);
1184                }
1185                else {
1186                    handleError(conn);
1187                }
1188                return null;
1189            }
1190        }
1191    
1192        private class BulkResponseStatus extends ClientCallable<List<BulkResponse>> {
1193    
1194            BulkResponseStatus(String filter, int start, int len) {
1195                super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_BULK_PARAM, filter,
1196                        RestConstants.OFFSET_PARAM, Integer.toString(start), RestConstants.LEN_PARAM, Integer.toString(len)));
1197            }
1198    
1199            @Override
1200            protected List<BulkResponse> call(HttpURLConnection conn) throws IOException, OozieClientException {
1201                conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1202                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1203                    Reader reader = new InputStreamReader(conn.getInputStream());
1204                    JSONObject json = (JSONObject) JSONValue.parse(reader);
1205                    JSONArray results = (JSONArray) json.get(JsonTags.BULK_RESPONSES);
1206                    if (results == null) {
1207                        results = new JSONArray();
1208                    }
1209                    return JsonToBean.createBulkResponseList(results);
1210                }
1211                else {
1212                    handleError(conn);
1213                }
1214                return null;
1215            }
1216        }
1217    
1218        private class CoordRerun extends ClientCallable<List<CoordinatorAction>> {
1219    
1220            CoordRerun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) {
1221                super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
1222                        RestConstants.JOB_COORD_ACTION_RERUN, RestConstants.JOB_COORD_RERUN_TYPE_PARAM, rerunType,
1223                        RestConstants.JOB_COORD_RERUN_SCOPE_PARAM, scope, RestConstants.JOB_COORD_RERUN_REFRESH_PARAM,
1224                        Boolean.toString(refresh), RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM, Boolean
1225                                .toString(noCleanup)));
1226            }
1227    
1228            @Override
1229            protected List<CoordinatorAction> call(HttpURLConnection conn) throws IOException, OozieClientException {
1230                conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1231                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1232                    Reader reader = new InputStreamReader(conn.getInputStream());
1233                    JSONObject json = (JSONObject) JSONValue.parse(reader);
1234                    JSONArray coordActions = (JSONArray) json.get(JsonTags.COORDINATOR_ACTIONS);
1235                    return JsonToBean.createCoordinatorActionList(coordActions);
1236                }
1237                else {
1238                    handleError(conn);
1239                }
1240                return null;
1241            }
1242        }
1243    
1244        private class BundleRerun extends ClientCallable<Void> {
1245    
1246            BundleRerun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) {
1247                super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
1248                        RestConstants.JOB_BUNDLE_ACTION_RERUN, RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM,
1249                        coordScope, RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM, dateScope,
1250                        RestConstants.JOB_COORD_RERUN_REFRESH_PARAM, Boolean.toString(refresh),
1251                        RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM, Boolean.toString(noCleanup)));
1252            }
1253    
1254            @Override
1255            protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
1256                conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1257                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1258                    return null;
1259                }
1260                else {
1261                    handleError(conn);
1262                }
1263                return null;
1264            }
1265        }
1266    
1267        /**
1268         * Rerun coordinator actions.
1269         *
1270         * @param jobId coordinator jobId
1271         * @param rerunType rerun type 'date' if -date is used, 'action-id' if -action is used
1272         * @param scope rerun scope for date or actionIds
1273         * @param refresh true if -refresh is given in command option
1274         * @param noCleanup true if -nocleanup is given in command option
1275         * @throws OozieClientException
1276         */
1277        public List<CoordinatorAction> reRunCoord(String jobId, String rerunType, String scope, boolean refresh,
1278                boolean noCleanup) throws OozieClientException {
1279            return new CoordRerun(jobId, rerunType, scope, refresh, noCleanup).call();
1280        }
1281    
1282        /**
1283         * Rerun bundle coordinators.
1284         *
1285         * @param jobId bundle jobId
1286         * @param coordScope rerun scope for coordinator jobs
1287         * @param dateScope rerun scope for date
1288         * @param refresh true if -refresh is given in command option
1289         * @param noCleanup true if -nocleanup is given in command option
1290         * @throws OozieClientException
1291         */
1292        public Void reRunBundle(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup)
1293                throws OozieClientException {
1294            return new BundleRerun(jobId, coordScope, dateScope, refresh, noCleanup).call();
1295        }
1296    
1297        /**
1298         * Return the info of the workflow jobs that match the filter.
1299         *
1300         * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
1301         * @param start jobs offset, base 1.
1302         * @param len number of jobs to return.
1303         * @return a list with the workflow jobs info, without node details.
1304         * @throws OozieClientException thrown if the jobs info could not be retrieved.
1305         */
1306        public List<WorkflowJob> getJobsInfo(String filter, int start, int len) throws OozieClientException {
1307            return new JobsStatus(filter, start, len).call();
1308        }
1309    
1310        /**
1311         * Return the info of the workflow jobs that match the filter.
1312         * <p/>
1313         * It returns the first 100 jobs that match the filter.
1314         *
1315         * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
1316         * @return a list with the workflow jobs info, without node details.
1317         * @throws OozieClientException thrown if the jobs info could not be retrieved.
1318         */
1319        public List<WorkflowJob> getJobsInfo(String filter) throws OozieClientException {
1320            return getJobsInfo(filter, 1, 50);
1321        }
1322    
1323        /**
1324         * Print sla info about coordinator and workflow jobs and actions.
1325         *
1326         * @param start starting offset
1327         * @param len number of results
1328         * @throws OozieClientException
1329         */
1330        public void getSlaInfo(int start, int len, String filter) throws OozieClientException {
1331            new SlaInfo(start, len, filter).call();
1332        }
1333    
1334        private class SlaInfo extends ClientCallable<Void> {
1335    
1336            SlaInfo(int start, int len, String filter) {
1337                super("GET", WS_PROTOCOL_VERSION_1, RestConstants.SLA, "", prepareParams(RestConstants.SLA_GT_SEQUENCE_ID,
1338                        Integer.toString(start), RestConstants.MAX_EVENTS, Integer.toString(len),
1339                        RestConstants.JOBS_FILTER_PARAM, filter));
1340            }
1341    
1342            @Override
1343            @SuppressWarnings("unchecked")
1344            protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
1345                conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1346                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1347                    BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream()));
1348                    String line = null;
1349                    while ((line = br.readLine()) != null) {
1350                        System.out.println(line);
1351                    }
1352                }
1353                else {
1354                    handleError(conn);
1355                }
1356                return null;
1357            }
1358        }
1359    
1360        private class JobIdAction extends ClientCallable<String> {
1361    
1362            JobIdAction(String externalId) {
1363                super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, "wf",
1364                        RestConstants.JOBS_EXTERNAL_ID_PARAM, externalId));
1365            }
1366    
1367            @Override
1368            @SuppressWarnings("unchecked")
1369            protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
1370                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1371                    Reader reader = new InputStreamReader(conn.getInputStream());
1372                    JSONObject json = (JSONObject) JSONValue.parse(reader);
1373                    return (String) json.get(JsonTags.JOB_ID);
1374                }
1375                else {
1376                    handleError(conn);
1377                }
1378                return null;
1379            }
1380        }
1381    
1382        /**
1383         * Return the workflow job Id for an external Id.
1384         * <p/>
1385         * The external Id must have provided at job creation time.
1386         *
1387         * @param externalId external Id given at job creation time.
1388         * @return the workflow job Id for an external Id, <code>null</code> if none.
1389         * @throws OozieClientException thrown if the operation could not be done.
1390         */
1391        public String getJobId(String externalId) throws OozieClientException {
1392            return new JobIdAction(externalId).call();
1393        }
1394    
1395        private class SetSystemMode extends ClientCallable<Void> {
1396    
1397            public SetSystemMode(SYSTEM_MODE status) {
1398                super("PUT", RestConstants.ADMIN, RestConstants.ADMIN_STATUS_RESOURCE, prepareParams(
1399                        RestConstants.ADMIN_SYSTEM_MODE_PARAM, status + ""));
1400            }
1401    
1402            @Override
1403            public Void call(HttpURLConnection conn) throws IOException, OozieClientException {
1404                if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
1405                    handleError(conn);
1406                }
1407                return null;
1408            }
1409        }
1410    
1411        /**
1412         * Enable or disable safe mode. Used by OozieCLI. In safe mode, Oozie would not accept any commands except status
1413         * command to change and view the safe mode status.
1414         *
1415         * @param status true to enable safe mode, false to disable safe mode.
1416         * @throws OozieClientException if it fails to set the safe mode status.
1417         */
1418        public void setSystemMode(SYSTEM_MODE status) throws OozieClientException {
1419            new SetSystemMode(status).call();
1420        }
1421    
1422        private class GetSystemMode extends ClientCallable<SYSTEM_MODE> {
1423    
1424            GetSystemMode() {
1425                super("GET", RestConstants.ADMIN, RestConstants.ADMIN_STATUS_RESOURCE, prepareParams());
1426            }
1427    
1428            @Override
1429            protected SYSTEM_MODE call(HttpURLConnection conn) throws IOException, OozieClientException {
1430                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1431                    Reader reader = new InputStreamReader(conn.getInputStream());
1432                    JSONObject json = (JSONObject) JSONValue.parse(reader);
1433                    return SYSTEM_MODE.valueOf((String) json.get(JsonTags.OOZIE_SYSTEM_MODE));
1434                }
1435                else {
1436                    handleError(conn);
1437                }
1438                return SYSTEM_MODE.NORMAL;
1439            }
1440        }
1441    
1442        /**
1443         * Returns if Oozie is in safe mode or not.
1444         *
1445         * @return true if safe mode is ON<br>
1446         *         false if safe mode is OFF
1447         * @throws OozieClientException throw if it could not obtain the safe mode status.
1448         */
1449        /*
1450         * public boolean isInSafeMode() throws OozieClientException { return new GetSafeMode().call(); }
1451         */
1452        public SYSTEM_MODE getSystemMode() throws OozieClientException {
1453            return new GetSystemMode().call();
1454        }
1455    
1456        private class GetBuildVersion extends ClientCallable<String> {
1457    
1458            GetBuildVersion() {
1459                super("GET", RestConstants.ADMIN, RestConstants.ADMIN_BUILD_VERSION_RESOURCE, prepareParams());
1460            }
1461    
1462            @Override
1463            protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
1464                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1465                    Reader reader = new InputStreamReader(conn.getInputStream());
1466                    JSONObject json = (JSONObject) JSONValue.parse(reader);
1467                    return (String) json.get(JsonTags.BUILD_VERSION);
1468                }
1469                else {
1470                    handleError(conn);
1471                }
1472                return null;
1473            }
1474        }
1475    
1476        /**
1477         * Return the Oozie server build version.
1478         *
1479         * @return the Oozie server build version.
1480         * @throws OozieClientException throw if it the server build version could not be retrieved.
1481         */
1482        public String getServerBuildVersion() throws OozieClientException {
1483            return new GetBuildVersion().call();
1484        }
1485    
1486        /**
1487         * Return the Oozie client build version.
1488         *
1489         * @return the Oozie client build version.
1490         */
1491        public String getClientBuildVersion() {
1492            return BuildInfo.getBuildInfo().getProperty(BuildInfo.BUILD_VERSION);
1493        }
1494    
1495        /**
1496         * Return the info of the coordinator jobs that match the filter.
1497         *
1498         * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
1499         * @param start jobs offset, base 1.
1500         * @param len number of jobs to return.
1501         * @return a list with the coordinator jobs info
1502         * @throws OozieClientException thrown if the jobs info could not be retrieved.
1503         */
1504        public List<CoordinatorJob> getCoordJobsInfo(String filter, int start, int len) throws OozieClientException {
1505            return new CoordJobsStatus(filter, start, len).call();
1506        }
1507    
1508        /**
1509         * Return the info of the bundle jobs that match the filter.
1510         *
1511         * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
1512         * @param start jobs offset, base 1.
1513         * @param len number of jobs to return.
1514         * @return a list with the bundle jobs info
1515         * @throws OozieClientException thrown if the jobs info could not be retrieved.
1516         */
1517        public List<BundleJob> getBundleJobsInfo(String filter, int start, int len) throws OozieClientException {
1518            return new BundleJobsStatus(filter, start, len).call();
1519        }
1520    
1521        public List<BulkResponse> getBulkInfo(String filter, int start, int len) throws OozieClientException {
1522            return new BulkResponseStatus(filter, start, len).call();
1523        }
1524    
1525        private class GetQueueDump extends ClientCallable<List<String>> {
1526            GetQueueDump() {
1527                super("GET", RestConstants.ADMIN, RestConstants.ADMIN_QUEUE_DUMP_RESOURCE, prepareParams());
1528            }
1529    
1530            @Override
1531            protected List<String> call(HttpURLConnection conn) throws IOException, OozieClientException {
1532                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1533                    Reader reader = new InputStreamReader(conn.getInputStream());
1534                    JSONObject json = (JSONObject) JSONValue.parse(reader);
1535                    JSONArray queueDumpArray = (JSONArray) json.get(JsonTags.QUEUE_DUMP);
1536    
1537                    List<String> list = new ArrayList<String>();
1538                    list.add("[Server Queue Dump]:");
1539                    for (Object o : queueDumpArray) {
1540                        JSONObject entry = (JSONObject) o;
1541                        if (entry.get(JsonTags.CALLABLE_DUMP) != null) {
1542                            String value = (String) entry.get(JsonTags.CALLABLE_DUMP);
1543                            list.add(value);
1544                        }
1545                    }
1546                    if (queueDumpArray.size() == 0) {
1547                        list.add("Queue dump is null!");
1548                    }
1549    
1550                    list.add("******************************************");
1551                    list.add("[Server Uniqueness Map Dump]:");
1552    
1553                    JSONArray uniqueDumpArray = (JSONArray) json.get(JsonTags.UNIQUE_MAP_DUMP);
1554                    for (Object o : uniqueDumpArray) {
1555                        JSONObject entry = (JSONObject) o;
1556                        if (entry.get(JsonTags.UNIQUE_ENTRY_DUMP) != null) {
1557                            String value = (String) entry.get(JsonTags.UNIQUE_ENTRY_DUMP);
1558                            list.add(value);
1559                        }
1560                    }
1561                    if (uniqueDumpArray.size() == 0) {
1562                        list.add("Uniqueness dump is null!");
1563                    }
1564                    return list;
1565                }
1566                else {
1567                    handleError(conn);
1568                }
1569                return null;
1570            }
1571        }
1572    
1573        /**
1574         * Return the Oozie queue's commands' dump
1575         *
1576         * @return the list of strings of callable identification in queue
1577         * @throws OozieClientException throw if it the queue dump could not be retrieved.
1578         */
1579        public List<String> getQueueDump() throws OozieClientException {
1580            return new GetQueueDump().call();
1581        }
1582    
1583        /**
1584         * Check if the string is not null or not empty.
1585         *
1586         * @param str
1587         * @param name
1588         * @return string
1589         */
1590        public static String notEmpty(String str, String name) {
1591            if (str == null) {
1592                throw new IllegalArgumentException(name + " cannot be null");
1593            }
1594            if (str.length() == 0) {
1595                throw new IllegalArgumentException(name + " cannot be empty");
1596            }
1597            return str;
1598        }
1599    
1600        /**
1601         * Check if the object is not null.
1602         *
1603         * @param <T>
1604         * @param obj
1605         * @param name
1606         * @return string
1607         */
1608        public static <T> T notNull(T obj, String name) {
1609            if (obj == null) {
1610                throw new IllegalArgumentException(name + " cannot be null");
1611            }
1612            return obj;
1613        }
1614    
1615    }