001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.client;
020
021import org.apache.oozie.BuildInfo;
022import org.apache.oozie.client.rest.JsonTags;
023import org.apache.oozie.client.rest.JsonToBean;
024import org.apache.oozie.client.rest.RestConstants;
025import org.apache.oozie.client.retry.ConnectionRetriableClient;
026import org.json.simple.JSONArray;
027import org.json.simple.JSONObject;
028import org.json.simple.JSONValue;
029import org.w3c.dom.Document;
030import org.w3c.dom.Element;
031
032import javax.xml.parsers.DocumentBuilderFactory;
033import javax.xml.transform.Transformer;
034import javax.xml.transform.TransformerFactory;
035import javax.xml.transform.dom.DOMSource;
036import javax.xml.transform.stream.StreamResult;
037import java.io.BufferedReader;
038import java.io.File;
039import java.io.FileInputStream;
040import java.io.IOException;
041import java.io.InputStream;
042import java.io.InputStreamReader;
043import java.io.OutputStream;
044import java.io.PrintStream;
045import java.io.Reader;
046import java.net.HttpURLConnection;
047import java.net.URL;
048import java.net.URLEncoder;
049import java.util.ArrayList;
050import java.util.Collections;
051import java.util.HashMap;
052import java.util.HashSet;
053import java.util.Iterator;
054import java.util.LinkedHashMap;
055import java.util.List;
056import java.util.Map;
057import java.util.Map.Entry;
058import java.util.Properties;
059import java.util.Set;
060import java.util.concurrent.Callable;
061
062/**
063 * Client API to submit and manage Oozie workflow jobs against an Oozie intance.
064 * <p>
065 * This class is thread safe.
066 * <p>
067 * Syntax for filter for the {@link #getJobsInfo(String)} {@link #getJobsInfo(String, int, int)} methods:
068 * <code>[NAME=VALUE][;NAME=VALUE]*</code>.
069 * <p>
070 * Valid filter names are:
071 * <p>
072 * <ul>
073 * <li>name: the workflow application name from the workflow definition.</li>
074 * <li>user: the user that submitted the job.</li>
075 * <li>group: the group for the job.</li>
076 * <li>status: the status of the job.</li>
077 * </ul>
078 * <p>
079 * The query will do an AND among all the filter names. The query will do an OR among all the filter values for the same
080 * name. Multiple values must be specified as different name value pairs.
081 */
082public class OozieClient {
083
084    public static final long WS_PROTOCOL_VERSION_0 = 0;
085
086    public static final long WS_PROTOCOL_VERSION_1 = 1;
087
088    public static final long WS_PROTOCOL_VERSION = 2; // pointer to current version
089
090    public static final String USER_NAME = "user.name";
091
092    @Deprecated
093    public static final String GROUP_NAME = "group.name";
094
095    public static final String JOB_ACL = "oozie.job.acl";
096
097    public static final String APP_PATH = "oozie.wf.application.path";
098
099    public static final String COORDINATOR_APP_PATH = "oozie.coord.application.path";
100
101    public static final String BUNDLE_APP_PATH = "oozie.bundle.application.path";
102
103    public static final String BUNDLE_ID = "oozie.bundle.id";
104
105    public static final String EXTERNAL_ID = "oozie.wf.external.id";
106
107    public static final String WORKFLOW_NOTIFICATION_PROXY = "oozie.wf.workflow.notification.proxy";
108
109    public static final String WORKFLOW_NOTIFICATION_URL = "oozie.wf.workflow.notification.url";
110
111    public static final String ACTION_NOTIFICATION_URL = "oozie.wf.action.notification.url";
112
113    public static final String COORD_ACTION_NOTIFICATION_URL = "oozie.coord.action.notification.url";
114
115    public static final String COORD_ACTION_NOTIFICATION_PROXY = "oozie.coord.action.notification.proxy";
116
117    public static final String RERUN_SKIP_NODES = "oozie.wf.rerun.skip.nodes";
118
119    public static final String RERUN_FAIL_NODES = "oozie.wf.rerun.failnodes";
120
121    public static final String LOG_TOKEN = "oozie.wf.log.token";
122
123    public static final String ACTION_MAX_RETRIES = "oozie.wf.action.max.retries";
124
125    public static final String ACTION_RETRY_INTERVAL = "oozie.wf.action.retry.interval";
126
127    public static final String FILTER_USER = "user";
128
129    public static final String FILTER_GROUP = "group";
130
131    public static final String FILTER_NAME = "name";
132
133    public static final String FILTER_STATUS = "status";
134
135    public static final String FILTER_NOMINAL_TIME = "nominaltime";
136
137    public static final String FILTER_FREQUENCY = "frequency";
138
139    public static final String FILTER_ID = "id";
140
141    public static final String FILTER_UNIT = "unit";
142
143    public static final String FILTER_JOBID = "jobid";
144
145    public static final String FILTER_APPNAME = "appname";
146
147    public static final String FILTER_SLA_APPNAME = "app_name";
148
149    public static final String FILTER_SLA_ID = "id";
150
151    public static final String FILTER_SLA_PARENT_ID = "parent_id";
152
153    public static final String FILTER_BUNDLE = "bundle";
154
155    public static final String FILTER_SLA_EVENT_STATUS = "event_status";
156
157    public static final String FILTER_SLA_STATUS = "sla_status";
158
159    public static final String FILTER_SLA_NOMINAL_START = "nominal_start";
160
161    public static final String FILTER_SLA_NOMINAL_END = "nominal_end";
162
163    public static final String FILTER_CREATED_TIME_START = "startcreatedtime";
164
165    public static final String FILTER_CREATED_TIME_END = "endcreatedtime";
166
167    public static final String SLA_DISABLE_ALERT = "oozie.sla.disable.alerts";
168
169    public static final String SLA_ENABLE_ALERT = "oozie.sla.enable.alerts";
170
171    public static final String SLA_DISABLE_ALERT_OLDER_THAN = SLA_DISABLE_ALERT + ".older.than";
172
173    public static final String SLA_DISABLE_ALERT_COORD = SLA_DISABLE_ALERT + ".coord";
174
175    public static final String CHANGE_VALUE_ENDTIME = "endtime";
176
177    public static final String CHANGE_VALUE_PAUSETIME = "pausetime";
178
179    public static final String CHANGE_VALUE_CONCURRENCY = "concurrency";
180
181    public static final String CHANGE_VALUE_STATUS = "status";
182
183    public static final String LIBPATH = "oozie.libpath";
184
185    public static final String USE_SYSTEM_LIBPATH = "oozie.use.system.libpath";
186
187    public static final String OOZIE_SUSPEND_ON_NODES = "oozie.suspend.on.nodes";
188
189    public static final String FILTER_SORT_BY = "sortby";
190
191    public enum SORT_BY {
192        createdTime("createdTimestamp"), lastModifiedTime("lastModifiedTimestamp");
193        private final String fullname;
194
195        SORT_BY(String fullname) {
196            this.fullname = fullname;
197        }
198
199        public String getFullname() {
200            return fullname;
201        }
202    }
203
204    public static enum SYSTEM_MODE {
205        NORMAL, NOWEBSERVICE, SAFEMODE
206    }
207
208    private static final Set<String> COMPLETED_WF_STATUSES = new HashSet<String>();
209    private static final Set<String> COMPLETED_COORD_AND_BUNDLE_STATUSES = new HashSet<String>();
210    private static final Set<String> COMPLETED_COORD_ACTION_STATUSES = new HashSet<String>();
211    static {
212        COMPLETED_WF_STATUSES.add(WorkflowJob.Status.FAILED.toString());
213        COMPLETED_WF_STATUSES.add(WorkflowJob.Status.KILLED.toString());
214        COMPLETED_WF_STATUSES.add(WorkflowJob.Status.SUCCEEDED.toString());
215        COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.FAILED.toString());
216        COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.KILLED.toString());
217        COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.SUCCEEDED.toString());
218        COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.DONEWITHERROR.toString());
219        COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.IGNORED.toString());
220        COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.FAILED.toString());
221        COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.IGNORED.toString());
222        COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.KILLED.toString());
223        COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.SKIPPED.toString());
224        COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.SUCCEEDED.toString());
225        COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.TIMEDOUT.toString());
226    }
227
228    /**
229     * debugMode =0 means no debugging. 1 means debugging on.
230     */
231    public int debugMode = 0;
232
233    private int retryCount = 4;
234
235
236    private String baseUrl;
237    private String protocolUrl;
238    private boolean validatedVersion = false;
239    private JSONArray supportedVersions;
240    private final Map<String, String> headers = new HashMap<String, String>();
241
242    private static final ThreadLocal<String> USER_NAME_TL = new ThreadLocal<String>();
243
244    /**
245     * Allows to impersonate other users in the Oozie server. The current user
246     * must be configured as a proxyuser in Oozie.
247     * <p>
248     * IMPORTANT: impersonation happens only with Oozie client requests done within
249     * doAs() calls.
250     *
251     * @param userName user to impersonate.
252     * @param callable callable with {@link OozieClient} calls impersonating the specified user.
253     * @return any response returned by the {@link Callable#call()} method.
254     * @throws Exception thrown by the {@link Callable#call()} method.
255     */
256    public static <T> T doAs(String userName, Callable<T> callable) throws Exception {
257        notEmpty(userName, "userName");
258        notNull(callable, "callable");
259        try {
260            USER_NAME_TL.set(userName);
261            return callable.call();
262        }
263        finally {
264            USER_NAME_TL.remove();
265        }
266    }
267
268    protected OozieClient() {
269    }
270
271    /**
272     * Create a Workflow client instance.
273     *
274     * @param oozieUrl URL of the Oozie instance it will interact with.
275     */
276    public OozieClient(String oozieUrl) {
277        this.baseUrl = notEmpty(oozieUrl, "oozieUrl");
278        if (!this.baseUrl.endsWith("/")) {
279            this.baseUrl += "/";
280        }
281    }
282
283    /**
284     * Return the Oozie URL of the workflow client instance.
285     * <p>
286     * This URL is the base URL fo the Oozie system, with not protocol versioning.
287     *
288     * @return the Oozie URL of the workflow client instance.
289     */
290    public String getOozieUrl() {
291        return baseUrl;
292    }
293
294    /**
295     * Return the Oozie URL used by the client and server for WS communications.
296     * <p>
297     * This URL is the original URL plus the versioning element path.
298     *
299     * @return the Oozie URL used by the client and server for communication.
300     * @throws OozieClientException thrown in the client and the server are not protocol compatible.
301     */
302    public String getProtocolUrl() throws OozieClientException {
303        validateWSVersion();
304        return protocolUrl;
305    }
306
307    /**
308     * @return current debug Mode
309     */
310    public int getDebugMode() {
311        return debugMode;
312    }
313
314    /**
315     * Set debug mode.
316     *
317     * @param debugMode : 0 means no debugging. 1 means debugging
318     */
319    public void setDebugMode(int debugMode) {
320        this.debugMode = debugMode;
321    }
322
323    public int getRetryCount() {
324        return retryCount;
325    }
326
327
328    public void setRetryCount(int retryCount) {
329        this.retryCount = retryCount;
330    }
331
332    private String getBaseURLForVersion(long protocolVersion) throws OozieClientException {
333        try {
334            if (supportedVersions == null) {
335                supportedVersions = getSupportedProtocolVersions();
336            }
337            if (supportedVersions == null) {
338                throw new OozieClientException("HTTP error", "no response message");
339            }
340            if (supportedVersions.contains(protocolVersion)) {
341                return baseUrl + "v" + protocolVersion + "/";
342            }
343            else {
344                throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, "Protocol version "
345                        + protocolVersion + " is not supported");
346            }
347        }
348        catch (IOException e) {
349            throw new OozieClientException(OozieClientException.IO_ERROR, e);
350        }
351    }
352
353    /**
354     * Validate that the Oozie client and server instances are protocol compatible.
355     *
356     * @throws OozieClientException thrown in the client and the server are not protocol compatible.
357     */
358    public synchronized void validateWSVersion() throws OozieClientException {
359        if (!validatedVersion) {
360            try {
361                supportedVersions = getSupportedProtocolVersions();
362                if (supportedVersions == null) {
363                    throw new OozieClientException("HTTP error", "no response message");
364                }
365                if (!supportedVersions.contains(WS_PROTOCOL_VERSION)
366                        && !supportedVersions.contains(WS_PROTOCOL_VERSION_1)
367                        && !supportedVersions.contains(WS_PROTOCOL_VERSION_0)) {
368                    StringBuilder msg = new StringBuilder();
369                    msg.append("Supported version [").append(WS_PROTOCOL_VERSION)
370                            .append("] or less, Unsupported versions[");
371                    String separator = "";
372                    for (Object version : supportedVersions) {
373                        msg.append(separator).append(version);
374                    }
375                    msg.append("]");
376                    throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, msg.toString());
377                }
378                if (supportedVersions.contains(WS_PROTOCOL_VERSION)) {
379                    protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION + "/";
380                }
381                else if (supportedVersions.contains(WS_PROTOCOL_VERSION_1)) {
382                    protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION_1 + "/";
383                }
384                else {
385                    if (supportedVersions.contains(WS_PROTOCOL_VERSION_0)) {
386                        protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION_0 + "/";
387                    }
388                }
389            }
390            catch (IOException ex) {
391                throw new OozieClientException(OozieClientException.IO_ERROR, ex);
392            }
393            validatedVersion = true;
394        }
395    }
396
397    private JSONArray getSupportedProtocolVersions() throws IOException, OozieClientException {
398        JSONArray versions = null;
399        final URL url = new URL(baseUrl + RestConstants.VERSIONS);
400
401        HttpURLConnection conn = createRetryableConnection(url, "GET");
402
403        if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) {
404            versions = (JSONArray) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
405        }
406        else {
407            handleError(conn);
408        }
409        return versions;
410    }
411
412    /**
413     * Create an empty configuration with just the {@link #USER_NAME} set to the JVM user name.
414     *
415     * @return an empty configuration.
416     */
417    public Properties createConfiguration() {
418        Properties conf = new Properties();
419        String userName = USER_NAME_TL.get();
420        if (userName == null) {
421            userName = System.getProperty("user.name");
422        }
423        conf.setProperty(USER_NAME, userName);
424        return conf;
425    }
426
427    /**
428     * Set a HTTP header to be used in the WS requests by the workflow instance.
429     *
430     * @param name header name.
431     * @param value header value.
432     */
433    public void setHeader(String name, String value) {
434        headers.put(notEmpty(name, "name"), notNull(value, "value"));
435    }
436
437    /**
438     * Get the value of a set HTTP header from the workflow instance.
439     *
440     * @param name header name.
441     * @return header value, <code>null</code> if not set.
442     */
443    public String getHeader(String name) {
444        return headers.get(notEmpty(name, "name"));
445    }
446
447    /**
448     * Get the set HTTP header
449     *
450     * @return map of header key and value
451     */
452    public Map<String, String> getHeaders() {
453        return headers;
454    }
455
456    /**
457     * Remove a HTTP header from the workflow client instance.
458     *
459     * @param name header name.
460     */
461    public void removeHeader(String name) {
462        headers.remove(notEmpty(name, "name"));
463    }
464
465    /**
466     * Return an iterator with all the header names set in the workflow instance.
467     *
468     * @return header names.
469     */
470    public Iterator<String> getHeaderNames() {
471        return Collections.unmodifiableMap(headers).keySet().iterator();
472    }
473
474    private URL createURL(Long protocolVersion, String collection, String resource, Map<String, String> parameters)
475            throws IOException, OozieClientException {
476        validateWSVersion();
477        StringBuilder sb = new StringBuilder();
478        if (protocolVersion == null) {
479            sb.append(protocolUrl);
480        }
481        else {
482            sb.append(getBaseURLForVersion(protocolVersion));
483        }
484        sb.append(collection);
485        if (resource != null && resource.length() > 0) {
486            sb.append("/").append(resource);
487        }
488        if (parameters.size() > 0) {
489            String separator = "?";
490            for (Map.Entry<String, String> param : parameters.entrySet()) {
491                if (param.getValue() != null) {
492                    sb.append(separator).append(URLEncoder.encode(param.getKey(), "UTF-8")).append("=").append(
493                            URLEncoder.encode(param.getValue(), "UTF-8"));
494                    separator = "&";
495                }
496            }
497        }
498        return new URL(sb.toString());
499    }
500
501    private boolean validateCommand(String url) {
502        {
503            if (protocolUrl.contains(baseUrl + "v0")) {
504                if (url.contains("dryrun") || url.contains("jobtype=c") || url.contains("systemmode")) {
505                    return false;
506                }
507            }
508        }
509        return true;
510    }
511    /**
512     * Create retryable http connection to oozie server.
513     *
514     * @param url
515     * @param method
516     * @return connection
517     * @throws IOException
518     */
519    protected HttpURLConnection createRetryableConnection(final URL url, final String method) throws IOException{
520        return (HttpURLConnection) new ConnectionRetriableClient(getRetryCount()) {
521            @Override
522            public Object doExecute(URL url, String method) throws IOException, OozieClientException {
523                HttpURLConnection conn = createConnection(url, method);
524                return conn;
525            }
526        }.execute(url, method);
527    }
528
529    /**
530     * Create http connection to oozie server.
531     *
532     * @param url
533     * @param method
534     * @return connection
535     * @throws IOException
536     * @throws OozieClientException
537     */
538    protected HttpURLConnection createConnection(URL url, String method) throws IOException, OozieClientException {
539        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
540        conn.setRequestMethod(method);
541        if (method.equals("POST") || method.equals("PUT")) {
542            conn.setDoOutput(true);
543        }
544        for (Map.Entry<String, String> header : headers.entrySet()) {
545            conn.setRequestProperty(header.getKey(), header.getValue());
546        }
547        return conn;
548    }
549
550    protected abstract class ClientCallable<T> implements Callable<T> {
551        private final String method;
552        private final String collection;
553        private final String resource;
554        private final Map<String, String> params;
555        private final Long protocolVersion;
556
557        public ClientCallable(String method, String collection, String resource, Map<String, String> params) {
558            this(method, null, collection, resource, params);
559        }
560
561        public ClientCallable(String method, Long protocolVersion, String collection, String resource, Map<String, String> params) {
562            this.method = method;
563            this.protocolVersion = protocolVersion;
564            this.collection = collection;
565            this.resource = resource;
566            this.params = params;
567        }
568
569        public T call() throws OozieClientException {
570            try {
571                URL url = createURL(protocolVersion, collection, resource, params);
572                if (validateCommand(url.toString())) {
573                    if (getDebugMode() > 0) {
574                        System.out.println(method + " " + url);
575                    }
576                    return call(createRetryableConnection(url, method));
577                }
578                else {
579                    System.out.println("Option not supported in target server. Supported only on Oozie-2.0 or greater."
580                            + " Use 'oozie help' for details");
581                    throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, new Exception());
582                }
583            }
584            catch (IOException ex) {
585                throw new OozieClientException(OozieClientException.IO_ERROR, ex);
586            }
587        }
588
589        protected abstract T call(HttpURLConnection conn) throws IOException, OozieClientException;
590    }
591
592    protected abstract class MapClientCallable extends ClientCallable<Map<String, String>> {
593
594        MapClientCallable(String method, String collection, String resource, Map<String, String> params) {
595            super(method, collection, resource, params);
596        }
597
598        @Override
599        protected Map<String, String> call(HttpURLConnection conn) throws IOException, OozieClientException {
600            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
601                Reader reader = new InputStreamReader(conn.getInputStream());
602                JSONObject json = (JSONObject) JSONValue.parse(reader);
603                Map<String, String> map = new HashMap<String, String>();
604                for (Object key : json.keySet()) {
605                    map.put((String)key, (String)json.get(key));
606                }
607                return map;
608            }
609            else {
610                handleError(conn);
611            }
612            return null;
613        }
614    }
615
616    static void handleError(HttpURLConnection conn) throws IOException, OozieClientException {
617        int status = conn.getResponseCode();
618        String error = conn.getHeaderField(RestConstants.OOZIE_ERROR_CODE);
619        String message = conn.getHeaderField(RestConstants.OOZIE_ERROR_MESSAGE);
620
621        if (error == null) {
622            error = "HTTP error code: " + status;
623        }
624
625        if (message == null) {
626            message = conn.getResponseMessage();
627        }
628        throw new OozieClientException(error, message);
629    }
630
631    static Map<String, String> prepareParams(String... params) {
632        Map<String, String> map = new LinkedHashMap<String, String>();
633        for (int i = 0; i < params.length; i = i + 2) {
634            map.put(params[i], params[i + 1]);
635        }
636        String doAsUserName = USER_NAME_TL.get();
637        if (doAsUserName != null) {
638            map.put(RestConstants.DO_AS_PARAM, doAsUserName);
639        }
640        return map;
641    }
642
643    public void writeToXml(Properties props, OutputStream out) throws IOException {
644        try {
645            Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument();
646            Element conf = doc.createElement("configuration");
647            doc.appendChild(conf);
648            conf.appendChild(doc.createTextNode("\n"));
649            for (String name : props.stringPropertyNames()) { // Properties whose key or value is not of type String are omitted.
650                String value = props.getProperty(name);
651                Element propNode = doc.createElement("property");
652                conf.appendChild(propNode);
653
654                Element nameNode = doc.createElement("name");
655                nameNode.appendChild(doc.createTextNode(name.trim()));
656                propNode.appendChild(nameNode);
657
658                Element valueNode = doc.createElement("value");
659                valueNode.appendChild(doc.createTextNode(value.trim()));
660                propNode.appendChild(valueNode);
661
662                conf.appendChild(doc.createTextNode("\n"));
663            }
664
665            DOMSource source = new DOMSource(doc);
666            StreamResult result = new StreamResult(out);
667            TransformerFactory transFactory = TransformerFactory.newInstance();
668            transFactory.setFeature("http://javax.xml.XMLConstants/feature/secure-processing", true);
669            Transformer transformer = transFactory.newTransformer();
670            transformer.transform(source, result);
671            if (getDebugMode() > 0) {
672                result = new StreamResult(System.out);
673                transformer.transform(source, result);
674                System.out.println();
675            }
676        }
677        catch (Exception e) {
678            throw new IOException(e);
679        }
680    }
681
682    private class JobSubmit extends ClientCallable<String> {
683        private final Properties conf;
684
685        JobSubmit(Properties conf, boolean start) {
686            super("POST", RestConstants.JOBS, "", (start) ? prepareParams(RestConstants.ACTION_PARAM,
687                    RestConstants.JOB_ACTION_START) : prepareParams());
688            this.conf = notNull(conf, "conf");
689        }
690
691        JobSubmit(String jobId, Properties conf) {
692            super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
693                    RestConstants.JOB_ACTION_RERUN));
694            this.conf = notNull(conf, "conf");
695        }
696
697        public JobSubmit(Properties conf, String jobActionDryrun) {
698            super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.ACTION_PARAM,
699                    RestConstants.JOB_ACTION_DRYRUN));
700            this.conf = notNull(conf, "conf");
701        }
702
703        @Override
704        protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
705            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
706            writeToXml(conf, conn.getOutputStream());
707            if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) {
708                JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
709                return (String) json.get(JsonTags.JOB_ID);
710            }
711            if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
712                handleError(conn);
713            }
714            return null;
715        }
716    }
717
718    /**
719     * Submit a workflow job.
720     *
721     * @param conf job configuration.
722     * @return the job Id.
723     * @throws OozieClientException thrown if the job could not be submitted.
724     */
725    public String submit(Properties conf) throws OozieClientException {
726        return (new JobSubmit(conf, false)).call();
727    }
728
729    private class JobAction extends ClientCallable<Void> {
730
731        JobAction(String jobId, String action) {
732            super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, action));
733        }
734
735        JobAction(String jobId, String action, String params) {
736            super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, action,
737                    RestConstants.JOB_CHANGE_VALUE, params));
738        }
739
740        @Override
741        protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
742            if (!(conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
743                handleError(conn);
744            }
745            return null;
746        }
747    }
748
749    private class JobsAction extends ClientCallable<JSONObject> {
750
751        JobsAction(String action, String filter, String jobType, int start, int len) {
752            super("PUT", RestConstants.JOBS, "",
753                    prepareParams(RestConstants.ACTION_PARAM, action,
754                            RestConstants.JOB_FILTER_PARAM, filter, RestConstants.JOBTYPE_PARAM, jobType,
755                            RestConstants.OFFSET_PARAM, Integer.toString(start),
756                            RestConstants.LEN_PARAM, Integer.toString(len)));
757        }
758
759        @Override
760        protected JSONObject call(HttpURLConnection conn) throws IOException, OozieClientException {
761            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
762            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
763                Reader reader = new InputStreamReader(conn.getInputStream());
764                JSONObject json = (JSONObject) JSONValue.parse(reader);
765                return json;
766            }
767            else {
768                handleError(conn);
769            }
770            return null;
771        }
772    }
773    /**
774     * Update coord definition.
775     *
776     * @param jobId the job id
777     * @param conf the conf
778     * @param dryrun the dryrun
779     * @param showDiff the show diff
780     * @return the string
781     * @throws OozieClientException the oozie client exception
782     */
783    public String updateCoord(String jobId, Properties conf, String dryrun, String showDiff)
784            throws OozieClientException {
785        return (new UpdateCoord(jobId, conf, dryrun, showDiff)).call();
786    }
787
788    /**
789     * Update coord definition without properties.
790     *
791     * @param jobId the job id
792     * @param dryrun the dryrun
793     * @param showDiff the show diff
794     * @return the string
795     * @throws OozieClientException the oozie client exception
796     */
797    public String updateCoord(String jobId, String dryrun, String showDiff) throws OozieClientException {
798        return (new UpdateCoord(jobId, dryrun, showDiff)).call();
799    }
800
801    /**
802     * The Class UpdateCoord.
803     */
804    private class UpdateCoord extends ClientCallable<String> {
805        private final Properties conf;
806
807        public UpdateCoord(String jobId, Properties conf, String jobActionDryrun, String showDiff) {
808            super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
809                    RestConstants.JOB_COORD_UPDATE, RestConstants.JOB_ACTION_DRYRUN, jobActionDryrun,
810                    RestConstants.JOB_ACTION_SHOWDIFF, showDiff));
811            this.conf = conf;
812        }
813
814        public UpdateCoord(String jobId, String jobActionDryrun, String showDiff) {
815            this(jobId, new Properties(), jobActionDryrun, showDiff);
816        }
817
818        @Override
819        protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
820            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
821            writeToXml(conf, conn.getOutputStream());
822
823            if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) {
824                JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
825                JSONObject update = (JSONObject) json.get(JsonTags.COORD_UPDATE);
826                if (update != null) {
827                    return (String) update.get(JsonTags.COORD_UPDATE_DIFF);
828                }
829                else {
830                    return "";
831                }
832            }
833            if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
834                handleError(conn);
835            }
836            return null;
837        }
838    }
839
840    /**
841     * dryrun for a given job
842     *
843     * @param conf Job configuration.
844     */
845    public String dryrun(Properties conf) throws OozieClientException {
846        return new JobSubmit(conf, RestConstants.JOB_ACTION_DRYRUN).call();
847    }
848
849    /**
850     * Start a workflow job.
851     *
852     * @param jobId job Id.
853     * @throws OozieClientException thrown if the job could not be started.
854     */
855    public void start(String jobId) throws OozieClientException {
856        new JobAction(jobId, RestConstants.JOB_ACTION_START).call();
857    }
858
859    /**
860     * Submit and start a workflow job.
861     *
862     * @param conf job configuration.
863     * @return the job Id.
864     * @throws OozieClientException thrown if the job could not be submitted.
865     */
866    public String run(Properties conf) throws OozieClientException {
867        return (new JobSubmit(conf, true)).call();
868    }
869
870    /**
871     * Rerun a workflow job.
872     *
873     * @param jobId job Id to rerun.
874     * @param conf configuration information for the rerun.
875     * @throws OozieClientException thrown if the job could not be started.
876     */
877    public void reRun(String jobId, Properties conf) throws OozieClientException {
878        new JobSubmit(jobId, conf).call();
879    }
880
881    /**
882     * Suspend a workflow job.
883     *
884     * @param jobId job Id.
885     * @throws OozieClientException thrown if the job could not be suspended.
886     */
887    public void suspend(String jobId) throws OozieClientException {
888        new JobAction(jobId, RestConstants.JOB_ACTION_SUSPEND).call();
889    }
890
891    /**
892     * Resume a workflow job.
893     *
894     * @param jobId job Id.
895     * @throws OozieClientException thrown if the job could not be resume.
896     */
897    public void resume(String jobId) throws OozieClientException {
898        new JobAction(jobId, RestConstants.JOB_ACTION_RESUME).call();
899    }
900
901    /**
902     * Kill a workflow/coord/bundle job.
903     *
904     * @param jobId job Id.
905     * @throws OozieClientException thrown if the job could not be killed.
906     */
907    public void kill(String jobId) throws OozieClientException {
908        new JobAction(jobId, RestConstants.JOB_ACTION_KILL).call();
909    }
910
911    /**
912     * Kill coordinator actions
913     * @param jobId coordinator Job Id
914     * @param rangeType type 'date' if -date is used, 'action-num' if -action is used
915     * @param scope kill scope for date or action nums
916     * @return list of coordinator actions that underwent kill
917     * @throws OozieClientException thrown if some actions could not be killed.
918     */
919    public List<CoordinatorAction> kill(String jobId, String rangeType, String scope) throws OozieClientException {
920        return new CoordActionsKill(jobId, rangeType, scope).call();
921    }
922
923    public JSONObject bulkModifyJobs(String actionType, String filter, String jobType, int start, int len)
924            throws OozieClientException {
925        return new JobsAction(actionType, filter, jobType, start, len).call();
926    }
927
928    public JSONObject killJobs(String filter, String jobType, int start, int len)
929            throws OozieClientException {
930        return bulkModifyJobs("kill", filter, jobType, start, len);
931    }
932
933    public JSONObject suspendJobs(String filter, String jobType, int start, int len)
934            throws OozieClientException {
935        return bulkModifyJobs("suspend", filter, jobType, start, len);
936    }
937
938    public JSONObject resumeJobs(String filter, String jobType, int start, int len)
939            throws OozieClientException {
940        return bulkModifyJobs("resume", filter, jobType, start, len);
941    }
942    /**
943     * Change a coordinator job.
944     *
945     * @param jobId job Id.
946     * @param changeValue change value.
947     * @throws OozieClientException thrown if the job could not be changed.
948     */
949    public void change(String jobId, String changeValue) throws OozieClientException {
950        new JobAction(jobId, RestConstants.JOB_ACTION_CHANGE, changeValue).call();
951    }
952
953    /**
954     * Ignore a coordinator job.
955     *
956     * @param jobId coord job Id.
957     * @param scope list of coord actions to be ignored
958     * @throws OozieClientException thrown if the job could not be changed.
959     */
960    public List<CoordinatorAction> ignore(String jobId, String scope) throws OozieClientException {
961        return new CoordIgnore(jobId, RestConstants.JOB_COORD_SCOPE_ACTION, scope).call();
962    }
963
964    private class JobInfo extends ClientCallable<WorkflowJob> {
965
966        JobInfo(String jobId, int start, int len) {
967            super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
968                    RestConstants.JOB_SHOW_INFO, RestConstants.OFFSET_PARAM, Integer.toString(start),
969                    RestConstants.LEN_PARAM, Integer.toString(len)));
970        }
971
972        @Override
973        protected WorkflowJob call(HttpURLConnection conn) throws IOException, OozieClientException {
974            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
975                Reader reader = new InputStreamReader(conn.getInputStream());
976                JSONObject json = (JSONObject) JSONValue.parse(reader);
977                return JsonToBean.createWorkflowJob(json);
978            }
979            else {
980                handleError(conn);
981            }
982            return null;
983        }
984    }
985
986    private class JMSInfo extends ClientCallable<JMSConnectionInfo> {
987
988        JMSInfo() {
989            super("GET", RestConstants.ADMIN, RestConstants.ADMIN_JMS_INFO, prepareParams());
990        }
991
992        protected JMSConnectionInfo call(HttpURLConnection conn) throws IOException, OozieClientException {
993            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
994                Reader reader = new InputStreamReader(conn.getInputStream());
995                JSONObject json = (JSONObject) JSONValue.parse(reader);
996                return JsonToBean.createJMSConnectionInfo(json);
997            }
998            else {
999                handleError(conn);
1000            }
1001            return null;
1002        }
1003    }
1004
1005    private class WorkflowActionInfo extends ClientCallable<WorkflowAction> {
1006        WorkflowActionInfo(String actionId) {
1007            super("GET", RestConstants.JOB, notEmpty(actionId, "id"), prepareParams(RestConstants.JOB_SHOW_PARAM,
1008                    RestConstants.JOB_SHOW_INFO));
1009        }
1010
1011        @Override
1012        protected WorkflowAction call(HttpURLConnection conn) throws IOException, OozieClientException {
1013            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1014                Reader reader = new InputStreamReader(conn.getInputStream());
1015                JSONObject json = (JSONObject) JSONValue.parse(reader);
1016                return JsonToBean.createWorkflowAction(json);
1017            }
1018            else {
1019                handleError(conn);
1020            }
1021            return null;
1022        }
1023    }
1024
1025    /**
1026     * Get the info of a workflow job.
1027     *
1028     * @param jobId job Id.
1029     * @return the job info.
1030     * @throws OozieClientException thrown if the job info could not be retrieved.
1031     */
1032    public WorkflowJob getJobInfo(String jobId) throws OozieClientException {
1033        return getJobInfo(jobId, 0, 0);
1034    }
1035
1036    /**
1037     * Get the JMS Connection info
1038     * @return JMSConnectionInfo object
1039     * @throws OozieClientException
1040     */
1041    public JMSConnectionInfo getJMSConnectionInfo() throws OozieClientException {
1042        return new JMSInfo().call();
1043    }
1044
1045    /**
1046     * Get the info of a workflow job and subset actions.
1047     *
1048     * @param jobId job Id.
1049     * @param start starting index in the list of actions belonging to the job
1050     * @param len number of actions to be returned
1051     * @return the job info.
1052     * @throws OozieClientException thrown if the job info could not be retrieved.
1053     */
1054    public WorkflowJob getJobInfo(String jobId, int start, int len) throws OozieClientException {
1055        return new JobInfo(jobId, start, len).call();
1056    }
1057
1058    /**
1059     * Get the info of a workflow action.
1060     *
1061     * @param actionId Id.
1062     * @return the workflow action info.
1063     * @throws OozieClientException thrown if the job info could not be retrieved.
1064     */
1065    public WorkflowAction getWorkflowActionInfo(String actionId) throws OozieClientException {
1066        return new WorkflowActionInfo(actionId).call();
1067    }
1068
1069    /**
1070     * Get the log of a workflow job.
1071     *
1072     * @param jobId job Id.
1073     * @return the job log.
1074     * @throws OozieClientException thrown if the job info could not be retrieved.
1075     */
1076    public String getJobLog(String jobId) throws OozieClientException {
1077        return new JobLog(jobId).call();
1078    }
1079
1080    /**
1081     * Get the audit log of a job.
1082     *
1083     * @param jobId
1084     * @param ps
1085     * @throws OozieClientException
1086     */
1087    public void getJobAuditLog(String jobId, PrintStream ps) throws OozieClientException {
1088        new JobAuditLog(jobId, ps).call();
1089    }
1090
1091    /**
1092     * Get the log of a job.
1093     *
1094     * @param jobId job Id.
1095     * @param logRetrievalType Based on which filter criteria the log is retrieved
1096     * @param logRetrievalScope Value for the retrieval type
1097     * @param logFilter log filter
1098     * @param ps Printstream of command line interface
1099     * @throws OozieClientException thrown if the job info could not be retrieved.
1100     */
1101    public void getJobLog(String jobId, String logRetrievalType, String logRetrievalScope, String logFilter,
1102            PrintStream ps) throws OozieClientException {
1103        new JobLog(jobId, logRetrievalType, logRetrievalScope, logFilter, ps).call();
1104    }
1105
1106    /**
1107     * Get the error log of a job.
1108     *
1109     * @param jobId
1110     * @param ps
1111     * @throws OozieClientException
1112     */
1113    public void getJobErrorLog(String jobId, PrintStream ps) throws OozieClientException {
1114        new JobErrorLog(jobId, ps).call();
1115    }
1116
1117    /**
1118     * Get the log of a job.
1119     *
1120     * @param jobId job Id.
1121     * @param logRetrievalType Based on which filter criteria the log is retrieved
1122     * @param logRetrievalScope Value for the retrieval type
1123     * @param ps Printstream of command line interface
1124     * @throws OozieClientException thrown if the job info could not be retrieved.
1125     */
1126    public void getJobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps)
1127            throws OozieClientException {
1128        getJobLog(jobId, logRetrievalType, logRetrievalScope, null, ps);
1129    }
1130
1131    private class JobLog extends JobMetadata {
1132        JobLog(String jobId) {
1133            super(jobId, RestConstants.JOB_SHOW_LOG);
1134        }
1135        JobLog(String jobId, String logRetrievalType, String logRetrievalScope, String logFilter, PrintStream ps) {
1136            super(jobId, logRetrievalType, logRetrievalScope, RestConstants.JOB_SHOW_LOG, logFilter, ps);
1137        }
1138    }
1139
1140    private class JobErrorLog extends JobMetadata {
1141        JobErrorLog(String jobId, PrintStream ps) {
1142            super(jobId, RestConstants.JOB_SHOW_ERROR_LOG, ps);
1143        }
1144    }
1145
1146    private class JobAuditLog extends JobMetadata {
1147        JobAuditLog(String jobId, PrintStream ps) {
1148            super(jobId, RestConstants.JOB_SHOW_AUDIT_LOG, ps);
1149        }
1150    }
1151
1152
1153    /**
1154     * Gets the JMS topic name for a particular job
1155     * @param jobId given jobId
1156     * @return the JMS topic name
1157     * @throws OozieClientException
1158     */
1159    public String getJMSTopicName(String jobId) throws OozieClientException {
1160        return new JMSTopic(jobId).call();
1161    }
1162
1163    private class JMSTopic extends ClientCallable<String> {
1164
1165        JMSTopic(String jobId) {
1166            super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
1167                    RestConstants.JOB_SHOW_JMS_TOPIC));
1168        }
1169
1170        protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
1171            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1172                Reader reader = new InputStreamReader(conn.getInputStream());
1173                JSONObject json = (JSONObject) JSONValue.parse(reader);
1174                return (String) json.get(JsonTags.JMS_TOPIC_NAME);
1175            }
1176            else {
1177                handleError(conn);
1178            }
1179            return null;
1180        }
1181    }
1182
1183    /**
1184     * Get the definition of a workflow job.
1185     *
1186     * @param jobId job Id.
1187     * @return the job log.
1188     * @throws OozieClientException thrown if the job info could not be retrieved.
1189     */
1190    public String getJobDefinition(String jobId) throws OozieClientException {
1191        return new JobDefinition(jobId).call();
1192    }
1193
1194    private class JobDefinition extends JobMetadata {
1195
1196        JobDefinition(String jobId) {
1197            super(jobId, RestConstants.JOB_SHOW_DEFINITION);
1198        }
1199    }
1200
1201    private class JobMetadata extends ClientCallable<String> {
1202        PrintStream printStream;
1203
1204        JobMetadata(String jobId, String metaType) {
1205            super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
1206                    metaType));
1207        }
1208
1209        JobMetadata(String jobId, String metaType, PrintStream ps) {
1210            this(jobId, metaType);
1211            printStream = ps;
1212
1213        }
1214
1215        JobMetadata(String jobId, String logRetrievalType, String logRetrievalScope, String metaType, String logFilter,
1216                PrintStream ps) {
1217            super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
1218                    metaType, RestConstants.JOB_LOG_TYPE_PARAM, logRetrievalType, RestConstants.JOB_LOG_SCOPE_PARAM,
1219                    logRetrievalScope, RestConstants.LOG_FILTER_OPTION, logFilter));
1220            printStream = ps;
1221        }
1222
1223        @Override
1224        protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
1225            String returnVal = null;
1226            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1227                InputStream is = conn.getInputStream();
1228                InputStreamReader isr = new InputStreamReader(is);
1229                try {
1230                    if (printStream != null) {
1231                        sendToOutputStream(isr, -1);
1232                    }
1233                    else {
1234                        returnVal = getReaderAsString(isr, -1);
1235                    }
1236                }
1237                finally {
1238                    isr.close();
1239                }
1240            }
1241            else {
1242                handleError(conn);
1243            }
1244            return returnVal;
1245        }
1246
1247        /**
1248         * Output the log to command line interface
1249         *
1250         * @param reader reader to read into a string.
1251         * @param maxLen max content length allowed, if -1 there is no limit.
1252         * @throws IOException
1253         */
1254        private void sendToOutputStream(Reader reader, int maxLen) throws IOException {
1255            notNull(reader, "reader");
1256            StringBuilder sb = new StringBuilder();
1257            char[] buffer = new char[2048];
1258            int read;
1259            int count = 0;
1260            int noOfCharstoFlush = 1024;
1261            while ((read = reader.read(buffer)) > -1) {
1262                count += read;
1263                if ((maxLen > -1) && (count > maxLen)) {
1264                    break;
1265                }
1266                sb.append(buffer, 0, read);
1267                if (sb.length() > noOfCharstoFlush) {
1268                    printStream.print(sb.toString());
1269                    sb = new StringBuilder("");
1270                }
1271            }
1272            printStream.print(sb.toString());
1273        }
1274
1275        /**
1276         * Return a reader as string.
1277         * <p>
1278         *
1279         * @param reader reader to read into a string.
1280         * @param maxLen max content length allowed, if -1 there is no limit.
1281         * @return the reader content.
1282         * @throws IOException thrown if the resource could not be read.
1283         */
1284        private String getReaderAsString(Reader reader, int maxLen) throws IOException {
1285            notNull(reader, "reader");
1286            StringBuffer sb = new StringBuffer();
1287            char[] buffer = new char[2048];
1288            int read;
1289            int count = 0;
1290            while ((read = reader.read(buffer)) > -1) {
1291                count += read;
1292
1293                // read up to maxLen chars;
1294                if ((maxLen > -1) && (count > maxLen)) {
1295                    break;
1296                }
1297                sb.append(buffer, 0, read);
1298            }
1299            reader.close();
1300            return sb.toString();
1301        }
1302    }
1303
1304    private class CoordJobInfo extends ClientCallable<CoordinatorJob> {
1305
1306        CoordJobInfo(String jobId, String filter, int start, int len, String order) {
1307            super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
1308                    RestConstants.JOB_SHOW_INFO, RestConstants.JOB_FILTER_PARAM, filter, RestConstants.OFFSET_PARAM,
1309                    Integer.toString(start), RestConstants.LEN_PARAM, Integer.toString(len), RestConstants.ORDER_PARAM,
1310                    order));
1311        }
1312
1313        @Override
1314        protected CoordinatorJob call(HttpURLConnection conn) throws IOException, OozieClientException {
1315            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1316                Reader reader = new InputStreamReader(conn.getInputStream());
1317                JSONObject json = (JSONObject) JSONValue.parse(reader);
1318                return JsonToBean.createCoordinatorJob(json);
1319            }
1320            else {
1321                handleError(conn);
1322            }
1323            return null;
1324        }
1325    }
1326
1327    private class WfsForCoordAction extends ClientCallable<List<WorkflowJob>> {
1328
1329        WfsForCoordAction(String coordActionId) {
1330            super("GET", RestConstants.JOB, notEmpty(coordActionId, "coordActionId"), prepareParams(
1331                    RestConstants.JOB_SHOW_PARAM, RestConstants.ALL_WORKFLOWS_FOR_COORD_ACTION));
1332        }
1333
1334        @Override
1335        protected List<WorkflowJob> call(HttpURLConnection conn) throws IOException, OozieClientException {
1336            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1337                Reader reader = new InputStreamReader(conn.getInputStream());
1338                JSONObject json = (JSONObject) JSONValue.parse(reader);
1339                JSONArray workflows = (JSONArray) json.get(JsonTags.WORKFLOWS_JOBS);
1340                if (workflows == null) {
1341                    workflows = new JSONArray();
1342                }
1343                return JsonToBean.createWorkflowJobList(workflows);
1344            }
1345            else {
1346                handleError(conn);
1347            }
1348            return null;
1349        }
1350    }
1351
1352
1353    private class BundleJobInfo extends ClientCallable<BundleJob> {
1354
1355        BundleJobInfo(String jobId) {
1356            super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
1357                    RestConstants.JOB_SHOW_INFO));
1358        }
1359
1360        @Override
1361        protected BundleJob call(HttpURLConnection conn) throws IOException, OozieClientException {
1362            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1363                Reader reader = new InputStreamReader(conn.getInputStream());
1364                JSONObject json = (JSONObject) JSONValue.parse(reader);
1365                return JsonToBean.createBundleJob(json);
1366            }
1367            else {
1368                handleError(conn);
1369            }
1370            return null;
1371        }
1372    }
1373
1374    private class CoordActionInfo extends ClientCallable<CoordinatorAction> {
1375        CoordActionInfo(String actionId) {
1376            super("GET", RestConstants.JOB, notEmpty(actionId, "id"), prepareParams(RestConstants.JOB_SHOW_PARAM,
1377                    RestConstants.JOB_SHOW_INFO));
1378        }
1379
1380        @Override
1381        protected CoordinatorAction call(HttpURLConnection conn) throws IOException, OozieClientException {
1382            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1383                Reader reader = new InputStreamReader(conn.getInputStream());
1384                JSONObject json = (JSONObject) JSONValue.parse(reader);
1385                return JsonToBean.createCoordinatorAction(json);
1386            }
1387            else {
1388                handleError(conn);
1389            }
1390            return null;
1391        }
1392    }
1393
1394    /**
1395     * Get the info of a bundle job.
1396     *
1397     * @param jobId job Id.
1398     * @return the job info.
1399     * @throws OozieClientException thrown if the job info could not be retrieved.
1400     */
1401    public BundleJob getBundleJobInfo(String jobId) throws OozieClientException {
1402        return new BundleJobInfo(jobId).call();
1403    }
1404
1405    /**
1406     * Get the info of a coordinator job.
1407     *
1408     * @param jobId job Id.
1409     * @return the job info.
1410     * @throws OozieClientException thrown if the job info could not be retrieved.
1411     */
1412    public CoordinatorJob getCoordJobInfo(String jobId) throws OozieClientException {
1413        return new CoordJobInfo(jobId, null, -1, -1, "asc").call();
1414    }
1415
1416    /**
1417     * Get the info of a coordinator job and subset actions.
1418     *
1419     * @param jobId job Id.
1420     * @param filter filter the status filter
1421     * @param start starting index in the list of actions belonging to the job
1422     * @param len number of actions to be returned
1423     * @return the job info.
1424     * @throws OozieClientException thrown if the job info could not be retrieved.
1425     */
1426    public CoordinatorJob getCoordJobInfo(String jobId, String filter, int start, int len)
1427            throws OozieClientException {
1428        return new CoordJobInfo(jobId, filter, start, len, "asc").call();
1429    }
1430
1431    /**
1432     * Get the info of a coordinator job and subset actions.
1433     *
1434     * @param jobId job Id.
1435     * @param filter filter the status filter
1436     * @param start starting index in the list of actions belonging to the job
1437     * @param len number of actions to be returned
1438     * @param order order to list coord actions (e.g, desc)
1439     * @return the job info.
1440     * @throws OozieClientException thrown if the job info could not be retrieved.
1441     */
1442    public CoordinatorJob getCoordJobInfo(String jobId, String filter, int start, int len, String order)
1443            throws OozieClientException {
1444        return new CoordJobInfo(jobId, filter, start, len, order).call();
1445    }
1446
1447    public List<WorkflowJob> getWfsForCoordAction(String coordActionId) throws OozieClientException {
1448        return new WfsForCoordAction(coordActionId).call();
1449    }
1450
1451    /**
1452     * Get the info of a coordinator action.
1453     *
1454     * @param actionId Id.
1455     * @return the coordinator action info.
1456     * @throws OozieClientException thrown if the job info could not be retrieved.
1457     */
1458    public CoordinatorAction getCoordActionInfo(String actionId) throws OozieClientException {
1459        return new CoordActionInfo(actionId).call();
1460    }
1461
1462    private class JobsStatus extends ClientCallable<List<WorkflowJob>> {
1463
1464        JobsStatus(String filter, int start, int len) {
1465            super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter,
1466                    RestConstants.JOBTYPE_PARAM, "wf", RestConstants.OFFSET_PARAM, Integer.toString(start),
1467                    RestConstants.LEN_PARAM, Integer.toString(len)));
1468        }
1469
1470        @Override
1471        protected List<WorkflowJob> call(HttpURLConnection conn) throws IOException, OozieClientException {
1472            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1473            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1474                Reader reader = new InputStreamReader(conn.getInputStream());
1475                JSONObject json = (JSONObject) JSONValue.parse(reader);
1476                JSONArray workflows = (JSONArray) json.get(JsonTags.WORKFLOWS_JOBS);
1477                if (workflows == null) {
1478                    workflows = new JSONArray();
1479                }
1480                return JsonToBean.createWorkflowJobList(workflows);
1481            }
1482            else {
1483                handleError(conn);
1484            }
1485            return null;
1486        }
1487    }
1488
1489    private class CoordJobsStatus extends ClientCallable<List<CoordinatorJob>> {
1490
1491        CoordJobsStatus(String filter, int start, int len) {
1492            super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter,
1493                    RestConstants.JOBTYPE_PARAM, "coord", RestConstants.OFFSET_PARAM, Integer.toString(start),
1494                    RestConstants.LEN_PARAM, Integer.toString(len)));
1495        }
1496
1497        @Override
1498        protected List<CoordinatorJob> call(HttpURLConnection conn) throws IOException, OozieClientException {
1499            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1500            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1501                Reader reader = new InputStreamReader(conn.getInputStream());
1502                JSONObject json = (JSONObject) JSONValue.parse(reader);
1503                JSONArray jobs = (JSONArray) json.get(JsonTags.COORDINATOR_JOBS);
1504                if (jobs == null) {
1505                    jobs = new JSONArray();
1506                }
1507                return JsonToBean.createCoordinatorJobList(jobs);
1508            }
1509            else {
1510                handleError(conn);
1511            }
1512            return null;
1513        }
1514    }
1515
1516    private class BundleJobsStatus extends ClientCallable<List<BundleJob>> {
1517
1518        BundleJobsStatus(String filter, int start, int len) {
1519            super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter,
1520                    RestConstants.JOBTYPE_PARAM, "bundle", RestConstants.OFFSET_PARAM, Integer.toString(start),
1521                    RestConstants.LEN_PARAM, Integer.toString(len)));
1522        }
1523
1524        @Override
1525        protected List<BundleJob> call(HttpURLConnection conn) throws IOException, OozieClientException {
1526            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1527            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1528                Reader reader = new InputStreamReader(conn.getInputStream());
1529                JSONObject json = (JSONObject) JSONValue.parse(reader);
1530                JSONArray jobs = (JSONArray) json.get(JsonTags.BUNDLE_JOBS);
1531                if (jobs == null) {
1532                    jobs = new JSONArray();
1533                }
1534                return JsonToBean.createBundleJobList(jobs);
1535            }
1536            else {
1537                handleError(conn);
1538            }
1539            return null;
1540        }
1541    }
1542
1543    private class BulkResponseStatus extends ClientCallable<List<BulkResponse>> {
1544
1545        BulkResponseStatus(String filter, int start, int len) {
1546            super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_BULK_PARAM, filter,
1547                    RestConstants.OFFSET_PARAM, Integer.toString(start), RestConstants.LEN_PARAM, Integer.toString(len)));
1548        }
1549
1550        @Override
1551        protected List<BulkResponse> call(HttpURLConnection conn) throws IOException, OozieClientException {
1552            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1553            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1554                Reader reader = new InputStreamReader(conn.getInputStream());
1555                JSONObject json = (JSONObject) JSONValue.parse(reader);
1556                JSONArray results = (JSONArray) json.get(JsonTags.BULK_RESPONSES);
1557                if (results == null) {
1558                    results = new JSONArray();
1559                }
1560                return JsonToBean.createBulkResponseList(results);
1561            }
1562            else {
1563                handleError(conn);
1564            }
1565            return null;
1566        }
1567    }
1568
1569    private class CoordActionsKill extends ClientCallable<List<CoordinatorAction>> {
1570
1571        CoordActionsKill(String jobId, String rangeType, String scope) {
1572            super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
1573                    RestConstants.JOB_ACTION_KILL, RestConstants.JOB_COORD_RANGE_TYPE_PARAM, rangeType,
1574                    RestConstants.JOB_COORD_SCOPE_PARAM, scope));
1575        }
1576
1577        @Override
1578        protected List<CoordinatorAction> call(HttpURLConnection conn) throws IOException, OozieClientException {
1579            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1580            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1581                Reader reader = new InputStreamReader(conn.getInputStream());
1582                JSONObject json = (JSONObject) JSONValue.parse(reader);
1583                JSONArray coordActions = (JSONArray) json.get(JsonTags.COORDINATOR_ACTIONS);
1584                return JsonToBean.createCoordinatorActionList(coordActions);
1585            }
1586            else {
1587                handleError(conn);
1588            }
1589            return null;
1590        }
1591    }
1592
1593    private class CoordIgnore extends ClientCallable<List<CoordinatorAction>> {
1594        CoordIgnore(String jobId, String rerunType, String scope) {
1595            super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
1596                    RestConstants.JOB_ACTION_IGNORE, RestConstants.JOB_COORD_RANGE_TYPE_PARAM,
1597                    rerunType, RestConstants.JOB_COORD_SCOPE_PARAM, scope));
1598        }
1599
1600        @Override
1601        protected List<CoordinatorAction> call(HttpURLConnection conn) throws IOException, OozieClientException {
1602            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1603            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1604                Reader reader = new InputStreamReader(conn.getInputStream());
1605                JSONObject json = (JSONObject) JSONValue.parse(reader);
1606                if(json != null) {
1607                    JSONArray coordActions = (JSONArray) json.get(JsonTags.COORDINATOR_ACTIONS);
1608                    return JsonToBean.createCoordinatorActionList(coordActions);
1609                }
1610            }
1611            else {
1612                handleError(conn);
1613            }
1614            return null;
1615        }
1616    }
1617    private class CoordRerun extends ClientCallable<List<CoordinatorAction>> {
1618        private final Properties conf;
1619
1620        CoordRerun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup, boolean failed,
1621                   Properties conf) {
1622            super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
1623                    RestConstants.JOB_COORD_ACTION_RERUN, RestConstants.JOB_COORD_RANGE_TYPE_PARAM, rerunType,
1624                    RestConstants.JOB_COORD_SCOPE_PARAM, scope, RestConstants.JOB_COORD_RERUN_REFRESH_PARAM,
1625                    Boolean.toString(refresh), RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM, Boolean
1626                            .toString(noCleanup), RestConstants.JOB_COORD_RERUN_FAILED_PARAM, Boolean.toString(failed)));
1627            this.conf = conf;
1628        }
1629
1630        @Override
1631        protected List<CoordinatorAction> call(HttpURLConnection conn) throws IOException, OozieClientException {
1632            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1633            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1634                Reader reader = new InputStreamReader(conn.getInputStream());
1635                JSONObject json = (JSONObject) JSONValue.parse(reader);
1636                JSONArray coordActions = (JSONArray) json.get(JsonTags.COORDINATOR_ACTIONS);
1637                return JsonToBean.createCoordinatorActionList(coordActions);
1638            }
1639            else {
1640                handleError(conn);
1641            }
1642            return null;
1643        }
1644    }
1645
1646    private class BundleRerun extends ClientCallable<Void> {
1647
1648        BundleRerun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) {
1649            super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
1650                    RestConstants.JOB_BUNDLE_ACTION_RERUN, RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM,
1651                    coordScope, RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM, dateScope,
1652                    RestConstants.JOB_COORD_RERUN_REFRESH_PARAM, Boolean.toString(refresh),
1653                    RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM, Boolean.toString(noCleanup)));
1654        }
1655
1656        @Override
1657        protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
1658            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1659            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1660                return null;
1661            }
1662            else {
1663                handleError(conn);
1664            }
1665            return null;
1666        }
1667    }
1668
1669    /**
1670     * Rerun coordinator actions.
1671     *
1672     * @param jobId coordinator jobId
1673     * @param rerunType rerun type 'date' if -date is used, 'action-id' if -action is used
1674     * @param scope rerun scope for date or actionIds
1675     * @param refresh true if -refresh is given in command option
1676     * @param noCleanup true if -nocleanup is given in command option
1677     * @throws OozieClientException
1678     */
1679    public List<CoordinatorAction> reRunCoord(String jobId, String rerunType, String scope, boolean refresh,
1680            boolean noCleanup) throws OozieClientException {
1681        return new CoordRerun(jobId, rerunType, scope, refresh, noCleanup, false, null).call();
1682    }
1683
1684    /**
1685     * Rerun coordinator actions with failed option.
1686     *
1687     * @param jobId coordinator jobId
1688     * @param rerunType rerun type 'date' if -date is used, 'action-id' if -action is used
1689     * @param scope rerun scope for date or actionIds
1690     * @param refresh true if -refresh is given in command option
1691     * @param noCleanup true if -nocleanup is given in command option
1692     * @param failed true if -failed is given in command option
1693     * @throws OozieClientException
1694     */
1695    public List<CoordinatorAction> reRunCoord(String jobId, String rerunType, String scope, boolean refresh,
1696                                              boolean noCleanup, boolean failed, Properties props) throws OozieClientException {
1697        return new CoordRerun(jobId, rerunType, scope, refresh, noCleanup, failed, props).call();
1698    }
1699
1700    /**
1701     * Rerun bundle coordinators.
1702     *
1703     * @param jobId bundle jobId
1704     * @param coordScope rerun scope for coordinator jobs
1705     * @param dateScope rerun scope for date
1706     * @param refresh true if -refresh is given in command option
1707     * @param noCleanup true if -nocleanup is given in command option
1708     * @throws OozieClientException
1709     */
1710    public Void reRunBundle(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup)
1711            throws OozieClientException {
1712        return new BundleRerun(jobId, coordScope, dateScope, refresh, noCleanup).call();
1713    }
1714
1715    /**
1716     * Return the info of the workflow jobs that match the filter.
1717     *
1718     * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
1719     * @param start jobs offset, base 1.
1720     * @param len number of jobs to return.
1721     * @return a list with the workflow jobs info, without node details.
1722     * @throws OozieClientException thrown if the jobs info could not be retrieved.
1723     */
1724    public List<WorkflowJob> getJobsInfo(String filter, int start, int len) throws OozieClientException {
1725        return new JobsStatus(filter, start, len).call();
1726    }
1727
1728    /**
1729     * Return the info of the workflow jobs that match the filter.
1730     * <p>
1731     * It returns the first 100 jobs that match the filter.
1732     *
1733     * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
1734     * @return a list with the workflow jobs info, without node details.
1735     * @throws OozieClientException thrown if the jobs info could not be retrieved.
1736     */
1737    public List<WorkflowJob> getJobsInfo(String filter) throws OozieClientException {
1738        return getJobsInfo(filter, 1, 50);
1739    }
1740
1741    /**
1742     * Sla enable alert.
1743     *
1744     * @param jobIds the job ids
1745     * @param actions comma separated list of action ids or action id ranges
1746     * @param dates comma separated list of the nominal times
1747     * @throws OozieClientException the oozie client exception
1748     */
1749    public void slaEnableAlert(String jobIds, String actions, String dates) throws OozieClientException {
1750        new UpdateSLA(RestConstants.SLA_ENABLE_ALERT, jobIds, actions, dates, null).call();
1751    }
1752
1753    /**
1754     * Sla enable alert for bundle with coord name/id.
1755     *
1756     * @param bundleId the bundle id
1757     * @param actions comma separated list of action ids or action id ranges
1758     * @param dates comma separated list of the nominal times
1759     * @param coords the coordinators
1760     * @throws OozieClientException the oozie client exception
1761     */
1762    public void slaEnableAlert(String bundleId, String actions, String dates, String coords)
1763            throws OozieClientException {
1764        new UpdateSLA(RestConstants.SLA_ENABLE_ALERT, bundleId, actions, dates, coords).call();
1765    }
1766
1767    /**
1768     * Sla disable alert.
1769     *
1770     * @param jobIds the job ids
1771     * @param actions comma separated list of action ids or action id ranges
1772     * @param dates comma separated list of the nominal times
1773     * @throws OozieClientException the oozie client exception
1774     */
1775    public void slaDisableAlert(String jobIds, String actions, String dates) throws OozieClientException {
1776        new UpdateSLA(RestConstants.SLA_DISABLE_ALERT, jobIds, actions, dates, null).call();
1777    }
1778
1779    /**
1780     * Sla disable alert for bundle with coord name/id.
1781     *
1782     * @param bundleId the bundle id
1783     * @param actions comma separated list of action ids or action id ranges
1784     * @param dates comma separated list of the nominal times
1785     * @param coords the coordinators
1786     * @throws OozieClientException the oozie client exception
1787     */
1788    public void slaDisableAlert(String bundleId, String actions, String dates, String coords)
1789            throws OozieClientException {
1790        new UpdateSLA(RestConstants.SLA_DISABLE_ALERT, bundleId, actions, dates, coords).call();
1791    }
1792
1793    /**
1794     * Sla change definations.
1795     * SLA change definition parameters can be [&lt;key&gt;=&lt;value&gt;,...&lt;key&gt;=&lt;value&gt;]
1796     * Supported parameter key names are should-start, should-end and max-duration
1797     * @param jobIds the job ids
1798     * @param actions comma separated list of action ids or action id ranges.
1799     * @param dates comma separated list of the nominal times
1800     * @param newSlaParams the new sla params
1801     * @throws OozieClientException the oozie client exception
1802     */
1803    public void slaChange(String jobIds, String actions, String dates, String newSlaParams) throws OozieClientException {
1804        new UpdateSLA(RestConstants.SLA_CHANGE, jobIds, actions, dates, null, newSlaParams).call();
1805    }
1806
1807    /**
1808     * Sla change defination for bundle with coord name/id.
1809     * SLA change definition parameters can be [&lt;key&gt;=&lt;value&gt;,...&lt;key&gt;=&lt;value&gt;]
1810     * Supported parameter key names are should-start, should-end and max-duration
1811     * @param bundleId the bundle id
1812     * @param actions comma separated list of action ids or action id ranges
1813     * @param dates comma separated list of the nominal times
1814     * @param coords the coords
1815     * @param newSlaParams the new sla params
1816     * @throws OozieClientException the oozie client exception
1817     */
1818    public void slaChange(String bundleId, String actions, String dates, String coords, String newSlaParams)
1819            throws OozieClientException {
1820        new UpdateSLA(RestConstants.SLA_CHANGE, bundleId, actions, dates, coords, newSlaParams).call();
1821    }
1822
1823    /**
1824     * Sla change with new sla param as hasmap.
1825     * Supported parameter key names are should-start, should-end and max-duration
1826     * @param bundleId the bundle id
1827     * @param actions comma separated list of action ids or action id ranges
1828     * @param dates comma separated list of the nominal times
1829     * @param coords the coords
1830     * @param newSlaParams the new sla params
1831     * @throws OozieClientException the oozie client exception
1832     */
1833    public void slaChange(String bundleId, String actions, String dates, String coords, Map<String, String> newSlaParams)
1834            throws OozieClientException {
1835        new UpdateSLA(RestConstants.SLA_CHANGE, bundleId, actions, dates, coords, mapToString(newSlaParams)).call();
1836    }
1837
1838    /**
1839     * Convert Map to string.
1840     *
1841     * @param map the map
1842     * @return the string
1843     */
1844    private String mapToString(Map<String, String> map) {
1845        StringBuilder sb = new StringBuilder();
1846        Iterator<Entry<String, String>> it = map.entrySet().iterator();
1847        while (it.hasNext()) {
1848            Entry<String, String> e = (Entry<String, String>) it.next();
1849            sb.append(e.getKey()).append("=").append(e.getValue()).append(";");
1850        }
1851        return sb.toString();
1852    }
1853
1854    private class UpdateSLA extends ClientCallable<Void> {
1855
1856        UpdateSLA(String action, String jobIds, String coordActions, String dates, String coords) {
1857            super("PUT", RestConstants.JOB, notEmpty(jobIds, "jobIds"), prepareParams(RestConstants.ACTION_PARAM,
1858                    action, RestConstants.JOB_COORD_SCOPE_ACTION_LIST, coordActions, RestConstants.JOB_COORD_SCOPE_DATE,
1859                    dates, RestConstants.COORDINATORS_PARAM, coords));
1860        }
1861
1862        UpdateSLA(String action, String jobIds, String coordActions, String dates, String coords, String newSlaParams) {
1863            super("PUT", RestConstants.JOB, notEmpty(jobIds, "jobIds"), prepareParams(RestConstants.ACTION_PARAM,
1864                    action, RestConstants.JOB_COORD_SCOPE_ACTION_LIST, coordActions, RestConstants.JOB_COORD_SCOPE_DATE,
1865                    dates, RestConstants.COORDINATORS_PARAM, coords, RestConstants.JOB_CHANGE_VALUE, newSlaParams));
1866        }
1867
1868        @Override
1869        protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
1870            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1871            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1872                System.out.println("Done");
1873            }
1874            else {
1875                handleError(conn);
1876            }
1877            return null;
1878        }
1879    }
1880
1881    /**
1882    * Print sla info about coordinator and workflow jobs and actions.
1883    *
1884    * @param start starting offset
1885    * @param len number of results
1886    * @throws OozieClientException
1887    */
1888        public void getSlaInfo(int start, int len, String filter) throws OozieClientException {
1889            new SlaInfo(start, len, filter).call();
1890        }
1891
1892        private class SlaInfo extends ClientCallable<Void> {
1893
1894            SlaInfo(int start, int len, String filter) {
1895                super("GET", WS_PROTOCOL_VERSION_1, RestConstants.SLA, "", prepareParams(RestConstants.SLA_GT_SEQUENCE_ID,
1896                        Integer.toString(start), RestConstants.MAX_EVENTS, Integer.toString(len),
1897                        RestConstants.JOBS_FILTER_PARAM, filter));
1898            }
1899
1900            @Override
1901            protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
1902                conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1903                if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1904                    BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream()));
1905                    String line = null;
1906                    while ((line = br.readLine()) != null) {
1907                        System.out.println(line);
1908                    }
1909                }
1910                else {
1911                    handleError(conn);
1912                }
1913                return null;
1914            }
1915        }
1916
1917    private class JobIdAction extends ClientCallable<String> {
1918
1919        JobIdAction(String externalId) {
1920            super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, "wf",
1921                    RestConstants.JOBS_EXTERNAL_ID_PARAM, externalId));
1922        }
1923
1924        @Override
1925        protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
1926            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1927                Reader reader = new InputStreamReader(conn.getInputStream());
1928                JSONObject json = (JSONObject) JSONValue.parse(reader);
1929                return (String) json.get(JsonTags.JOB_ID);
1930            }
1931            else {
1932                handleError(conn);
1933            }
1934            return null;
1935        }
1936    }
1937
1938    /**
1939     * Return the workflow job Id for an external Id.
1940     * <p>
1941     * The external Id must have provided at job creation time.
1942     *
1943     * @param externalId external Id given at job creation time.
1944     * @return the workflow job Id for an external Id, <code>null</code> if none.
1945     * @throws OozieClientException thrown if the operation could not be done.
1946     */
1947    public String getJobId(String externalId) throws OozieClientException {
1948        return new JobIdAction(externalId).call();
1949    }
1950
1951    private class SetSystemMode extends ClientCallable<Void> {
1952
1953        public SetSystemMode(SYSTEM_MODE status) {
1954            super("PUT", RestConstants.ADMIN, RestConstants.ADMIN_STATUS_RESOURCE, prepareParams(
1955                    RestConstants.ADMIN_SYSTEM_MODE_PARAM, status + ""));
1956        }
1957
1958        @Override
1959        public Void call(HttpURLConnection conn) throws IOException, OozieClientException {
1960            if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
1961                handleError(conn);
1962            }
1963            return null;
1964        }
1965    }
1966
1967    /**
1968     * Enable or disable safe mode. Used by OozieCLI. In safe mode, Oozie would not accept any commands except status
1969     * command to change and view the safe mode status.
1970     *
1971     * @param status true to enable safe mode, false to disable safe mode.
1972     * @throws OozieClientException if it fails to set the safe mode status.
1973     */
1974    public void setSystemMode(SYSTEM_MODE status) throws OozieClientException {
1975        new SetSystemMode(status).call();
1976    }
1977
1978    private class GetSystemMode extends ClientCallable<SYSTEM_MODE> {
1979
1980        GetSystemMode() {
1981            super("GET", RestConstants.ADMIN, RestConstants.ADMIN_STATUS_RESOURCE, prepareParams());
1982        }
1983
1984        @Override
1985        protected SYSTEM_MODE call(HttpURLConnection conn) throws IOException, OozieClientException {
1986            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1987                Reader reader = new InputStreamReader(conn.getInputStream());
1988                JSONObject json = (JSONObject) JSONValue.parse(reader);
1989                return SYSTEM_MODE.valueOf((String) json.get(JsonTags.OOZIE_SYSTEM_MODE));
1990            }
1991            else {
1992                handleError(conn);
1993            }
1994            return SYSTEM_MODE.NORMAL;
1995        }
1996    }
1997
1998    /**
1999     * Returns if Oozie is in safe mode or not.
2000     *
2001     * @return true if safe mode is ON<br>
2002     *         false if safe mode is OFF
2003     * @throws OozieClientException throw if it could not obtain the safe mode status.
2004     */
2005    /*
2006     * public boolean isInSafeMode() throws OozieClientException { return new GetSafeMode().call(); }
2007     */
2008    public SYSTEM_MODE getSystemMode() throws OozieClientException {
2009        return new GetSystemMode().call();
2010    }
2011
2012    public String updateShareLib() throws OozieClientException {
2013        return new UpdateSharelib().call();
2014    }
2015
2016    public String listShareLib(String sharelibKey) throws OozieClientException {
2017        return new ListShareLib(sharelibKey).call();
2018    }
2019
2020    private class GetBuildVersion extends ClientCallable<String> {
2021
2022        GetBuildVersion() {
2023            super("GET", RestConstants.ADMIN, RestConstants.ADMIN_BUILD_VERSION_RESOURCE, prepareParams());
2024        }
2025
2026        @Override
2027        protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
2028            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
2029                Reader reader = new InputStreamReader(conn.getInputStream());
2030                JSONObject json = (JSONObject) JSONValue.parse(reader);
2031                return (String) json.get(JsonTags.BUILD_VERSION);
2032            }
2033            else {
2034                handleError(conn);
2035            }
2036            return null;
2037        }
2038    }
2039
2040    private class ValidateXML extends ClientCallable<String> {
2041
2042        String file = null;
2043
2044        ValidateXML(String file, String user) {
2045            super("POST", RestConstants.VALIDATE, "",
2046                    prepareParams(RestConstants.FILE_PARAM, file, RestConstants.USER_PARAM, user));
2047            this.file = file;
2048        }
2049
2050        @Override
2051        protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
2052            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
2053            if (file.startsWith("/")) {
2054                FileInputStream fi = new FileInputStream(new File(file));
2055                byte[] buffer = new byte[1024];
2056                int n = 0;
2057                while (-1 != (n = fi.read(buffer))) {
2058                    conn.getOutputStream().write(buffer, 0, n);
2059                }
2060            }
2061            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
2062                Reader reader = new InputStreamReader(conn.getInputStream());
2063                JSONObject json = (JSONObject) JSONValue.parse(reader);
2064                return (String) json.get(JsonTags.VALIDATE);
2065            }
2066            else if ((conn.getResponseCode() == HttpURLConnection.HTTP_NOT_FOUND)) {
2067                return null;
2068            }
2069            else {
2070                handleError(conn);
2071            }
2072            return null;
2073        }
2074    }
2075
2076
2077    private  class UpdateSharelib extends ClientCallable<String> {
2078
2079        UpdateSharelib() {
2080            super("GET", RestConstants.ADMIN, RestConstants.ADMIN_UPDATE_SHARELIB, prepareParams(
2081                    RestConstants.ALL_SERVER_REQUEST, "true"));
2082        }
2083
2084        @Override
2085        protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
2086            StringBuffer bf = new StringBuffer();
2087            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
2088                Reader reader = new InputStreamReader(conn.getInputStream());
2089                Object sharelib = (Object) JSONValue.parse(reader);
2090                bf.append("[ShareLib update status]").append(System.getProperty("line.separator"));
2091                if (sharelib instanceof JSONArray) {
2092                    for (Object o : ((JSONArray) sharelib)) {
2093                        JSONObject obj = (JSONObject) ((JSONObject) o).get(JsonTags.SHARELIB_LIB_UPDATE);
2094                        for (Object key : obj.keySet()) {
2095                            bf.append("\t").append(key).append(" = ").append(obj.get(key))
2096                                    .append(System.getProperty("line.separator"));
2097                        }
2098                        bf.append(System.getProperty("line.separator"));
2099                    }
2100                }
2101                else{
2102                    JSONObject obj = (JSONObject) ((JSONObject) sharelib).get(JsonTags.SHARELIB_LIB_UPDATE);
2103                    for (Object key : obj.keySet()) {
2104                        bf.append("\t").append(key).append(" = ").append(obj.get(key))
2105                                .append(System.getProperty("line.separator"));
2106                    }
2107                    bf.append(System.getProperty("line.separator"));
2108                }
2109                return bf.toString();
2110            }
2111            else {
2112                handleError(conn);
2113            }
2114            return null;
2115        }
2116    }
2117
2118    private class ListShareLib extends ClientCallable<String> {
2119
2120        ListShareLib(String sharelibKey) {
2121            super("GET", RestConstants.ADMIN, RestConstants.ADMIN_LIST_SHARELIB, prepareParams(
2122                    RestConstants.SHARE_LIB_REQUEST_KEY, sharelibKey));
2123        }
2124
2125        @Override
2126        protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
2127
2128            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
2129                StringBuffer bf = new StringBuffer();
2130                Reader reader = new InputStreamReader(conn.getInputStream());
2131                JSONObject json = (JSONObject) JSONValue.parse(reader);
2132                Object sharelib = json.get(JsonTags.SHARELIB_LIB);
2133                bf.append("[Available ShareLib]").append(System.getProperty("line.separator"));
2134                if (sharelib instanceof JSONArray) {
2135                    for (Object o : ((JSONArray) sharelib)) {
2136                        JSONObject obj = (JSONObject) o;
2137                        bf.append(obj.get(JsonTags.SHARELIB_LIB_NAME))
2138                                .append(System.getProperty("line.separator"));
2139                        if (obj.get(JsonTags.SHARELIB_LIB_FILES) != null) {
2140                            for (Object file : ((JSONArray) obj.get(JsonTags.SHARELIB_LIB_FILES))) {
2141                                bf.append("\t").append(file).append(System.getProperty("line.separator"));
2142                            }
2143                        }
2144                    }
2145                    return bf.toString();
2146                }
2147            }
2148            else {
2149                handleError(conn);
2150            }
2151            return null;
2152        }
2153
2154    }
2155
2156    /**
2157     * Return the Oozie server build version.
2158     *
2159     * @return the Oozie server build version.
2160     * @throws OozieClientException throw if it the server build version could not be retrieved.
2161     */
2162    public String getServerBuildVersion() throws OozieClientException {
2163        return new GetBuildVersion().call();
2164    }
2165
2166    /**
2167     * Return the Oozie client build version.
2168     *
2169     * @return the Oozie client build version.
2170     */
2171    public String getClientBuildVersion() {
2172        return BuildInfo.getBuildInfo().getProperty(BuildInfo.BUILD_VERSION);
2173    }
2174
2175    /**
2176     * Return the workflow application is valid.
2177     *
2178     * @param file local file or hdfs file.
2179     * @return the workflow application is valid.
2180     * @throws OozieClientException throw if it the workflow application's validation could not be retrieved.
2181     */
2182    public String validateXML(String file) throws OozieClientException {
2183        String fileName = file;
2184        if (file.startsWith("file://")) {
2185            fileName = file.substring(7, file.length());
2186        }
2187        if (!fileName.contains("://")) {
2188            File f = new File(fileName);
2189            if (!f.isFile()) {
2190                throw new OozieClientException("File error", "File does not exist : " + f.getAbsolutePath());
2191            }
2192            fileName = f.getAbsolutePath();
2193        }
2194        String user = USER_NAME_TL.get();
2195        if (user == null) {
2196            user = System.getProperty("user.name");
2197        }
2198        return new ValidateXML(fileName, user).call();
2199    }
2200
2201    /**
2202     * Return the info of the coordinator jobs that match the filter.
2203     *
2204     * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
2205     * @param start jobs offset, base 1.
2206     * @param len number of jobs to return.
2207     * @return a list with the coordinator jobs info
2208     * @throws OozieClientException thrown if the jobs info could not be retrieved.
2209     */
2210    public List<CoordinatorJob> getCoordJobsInfo(String filter, int start, int len) throws OozieClientException {
2211        return new CoordJobsStatus(filter, start, len).call();
2212    }
2213
2214    /**
2215     * Return the info of the bundle jobs that match the filter.
2216     *
2217     * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
2218     * @param start jobs offset, base 1.
2219     * @param len number of jobs to return.
2220     * @return a list with the bundle jobs info
2221     * @throws OozieClientException thrown if the jobs info could not be retrieved.
2222     */
2223    public List<BundleJob> getBundleJobsInfo(String filter, int start, int len) throws OozieClientException {
2224        return new BundleJobsStatus(filter, start, len).call();
2225    }
2226
2227    public List<BulkResponse> getBulkInfo(String filter, int start, int len) throws OozieClientException {
2228        return new BulkResponseStatus(filter, start, len).call();
2229    }
2230
2231    /**
2232     * Poll a job (Workflow Job ID, Coordinator Job ID, Coordinator Action ID, or Bundle Job ID) and return when it has reached a
2233     * terminal state.
2234     * (i.e. FAILED, KILLED, SUCCEEDED)
2235     *
2236     * @param id The Job ID
2237     * @param timeout timeout in minutes (negative values indicate no timeout)
2238     * @param interval polling interval in minutes (must be positive)
2239     * @param verbose if true, the current status will be printed out at each poll; if false, no output
2240     * @throws OozieClientException thrown if the job's status could not be retrieved
2241     */
2242    public void pollJob(String id, int timeout, int interval, boolean verbose) throws OozieClientException {
2243        notEmpty("id", id);
2244        if (interval < 1) {
2245            throw new IllegalArgumentException("interval must be a positive integer");
2246        }
2247        boolean noTimeout = (timeout < 1);
2248        long endTime = System.currentTimeMillis() + timeout * 60 * 1000;
2249        interval *= 60 * 1000;
2250
2251        final Set<String> completedStatuses;
2252        if (id.endsWith("-W")) {
2253            completedStatuses = COMPLETED_WF_STATUSES;
2254        } else if (id.endsWith("-C")) {
2255            completedStatuses = COMPLETED_COORD_AND_BUNDLE_STATUSES;
2256        } else if (id.endsWith("-B")) {
2257            completedStatuses = COMPLETED_COORD_AND_BUNDLE_STATUSES;
2258        } else if (id.contains("-C@")) {
2259            completedStatuses = COMPLETED_COORD_ACTION_STATUSES;
2260        } else {
2261            throw new IllegalArgumentException("invalid job type");
2262        }
2263
2264        String status = getStatus(id);
2265        if (verbose) {
2266            System.out.println(status);
2267        }
2268        while(!completedStatuses.contains(status) && (noTimeout || System.currentTimeMillis() <= endTime)) {
2269            try {
2270                Thread.sleep(interval);
2271            } catch (InterruptedException ie) {
2272                // ignore
2273            }
2274            status = getStatus(id);
2275            if (verbose) {
2276                System.out.println(status);
2277            }
2278        }
2279    }
2280
2281    /**
2282     * Gets the status for a particular job (Workflow Job ID, Coordinator Job ID, Coordinator Action ID, or Bundle Job ID).
2283     *
2284     * @param jobId given jobId
2285     * @return the status
2286     * @throws OozieClientException
2287     */
2288    public String getStatus(String jobId) throws OozieClientException {
2289        return new Status(jobId).call();
2290    }
2291
2292    private class Status extends ClientCallable<String> {
2293
2294        Status(String jobId) {
2295            super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
2296                    RestConstants.JOB_SHOW_STATUS));
2297        }
2298
2299        @Override
2300        protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
2301            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
2302                Reader reader = new InputStreamReader(conn.getInputStream());
2303                JSONObject json = (JSONObject) JSONValue.parse(reader);
2304                return (String) json.get(JsonTags.STATUS);
2305            }
2306            else {
2307                handleError(conn);
2308            }
2309            return null;
2310        }
2311    }
2312
2313    private class GetQueueDump extends ClientCallable<List<String>> {
2314        GetQueueDump() {
2315            super("GET", RestConstants.ADMIN, RestConstants.ADMIN_QUEUE_DUMP_RESOURCE, prepareParams());
2316        }
2317
2318        @Override
2319        protected List<String> call(HttpURLConnection conn) throws IOException, OozieClientException {
2320            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
2321                Reader reader = new InputStreamReader(conn.getInputStream());
2322                JSONObject json = (JSONObject) JSONValue.parse(reader);
2323                JSONArray queueDumpArray = (JSONArray) json.get(JsonTags.QUEUE_DUMP);
2324
2325                List<String> list = new ArrayList<String>();
2326                list.add("[Server Queue Dump]:");
2327                for (Object o : queueDumpArray) {
2328                    JSONObject entry = (JSONObject) o;
2329                    if (entry.get(JsonTags.CALLABLE_DUMP) != null) {
2330                        String value = (String) entry.get(JsonTags.CALLABLE_DUMP);
2331                        list.add(value);
2332                    }
2333                }
2334                if (queueDumpArray.size() == 0) {
2335                    list.add("Queue dump is null!");
2336                }
2337
2338                list.add("******************************************");
2339                list.add("[Server Uniqueness Map Dump]:");
2340
2341                JSONArray uniqueDumpArray = (JSONArray) json.get(JsonTags.UNIQUE_MAP_DUMP);
2342                for (Object o : uniqueDumpArray) {
2343                    JSONObject entry = (JSONObject) o;
2344                    if (entry.get(JsonTags.UNIQUE_ENTRY_DUMP) != null) {
2345                        String value = (String) entry.get(JsonTags.UNIQUE_ENTRY_DUMP);
2346                        list.add(value);
2347                    }
2348                }
2349                if (uniqueDumpArray.size() == 0) {
2350                    list.add("Uniqueness dump is null!");
2351                }
2352                return list;
2353            }
2354            else {
2355                handleError(conn);
2356            }
2357            return null;
2358        }
2359    }
2360
2361    /**
2362     * Return the Oozie queue's commands' dump
2363     *
2364     * @return the list of strings of callable identification in queue
2365     * @throws OozieClientException throw if it the queue dump could not be retrieved.
2366     */
2367    public List<String> getQueueDump() throws OozieClientException {
2368        return new GetQueueDump().call();
2369    }
2370
2371    private class GetAvailableOozieServers extends MapClientCallable {
2372
2373        GetAvailableOozieServers() {
2374            super("GET", RestConstants.ADMIN, RestConstants.ADMIN_AVAILABLE_OOZIE_SERVERS_RESOURCE, prepareParams());
2375        }
2376    }
2377
2378    /**
2379     * Return the list of available Oozie servers.
2380     *
2381     * @return the list of available Oozie servers.
2382     * @throws OozieClientException throw if it the list of available Oozie servers could not be retrieved.
2383     */
2384    public Map<String, String> getAvailableOozieServers() throws OozieClientException {
2385        return new GetAvailableOozieServers().call();
2386    }
2387
2388    private class GetServerConfiguration extends MapClientCallable {
2389
2390        GetServerConfiguration() {
2391            super("GET", RestConstants.ADMIN, RestConstants.ADMIN_CONFIG_RESOURCE, prepareParams());
2392        }
2393    }
2394
2395    /**
2396     * Return the Oozie system configuration.
2397     *
2398     * @return the Oozie system configuration.
2399     * @throws OozieClientException throw if the system configuration could not be retrieved.
2400     */
2401    public Map<String, String> getServerConfiguration() throws OozieClientException {
2402        return new GetServerConfiguration().call();
2403    }
2404
2405    private class GetJavaSystemProperties extends MapClientCallable {
2406
2407        GetJavaSystemProperties() {
2408            super("GET", RestConstants.ADMIN, RestConstants.ADMIN_JAVA_SYS_PROPS_RESOURCE, prepareParams());
2409        }
2410    }
2411
2412    /**
2413     * Return the Oozie Java system properties.
2414     *
2415     * @return the Oozie Java system properties.
2416     * @throws OozieClientException throw if the system properties could not be retrieved.
2417     */
2418    public Map<String, String> getJavaSystemProperties() throws OozieClientException {
2419        return new GetJavaSystemProperties().call();
2420    }
2421
2422    private class GetOSEnv extends MapClientCallable {
2423
2424        GetOSEnv() {
2425            super("GET", RestConstants.ADMIN, RestConstants.ADMIN_OS_ENV_RESOURCE, prepareParams());
2426        }
2427    }
2428
2429    /**
2430     * Return the Oozie system OS environment.
2431     *
2432     * @return the Oozie system OS environment.
2433     * @throws OozieClientException throw if the system OS environment could not be retrieved.
2434     */
2435    public Map<String, String> getOSEnv() throws OozieClientException {
2436        return new GetOSEnv().call();
2437    }
2438
2439    private class GetMetrics extends ClientCallable<Metrics> {
2440
2441        GetMetrics() {
2442            super("GET", RestConstants.ADMIN, RestConstants.ADMIN_METRICS_RESOURCE, prepareParams());
2443        }
2444
2445        @Override
2446        protected Metrics call(HttpURLConnection conn) throws IOException, OozieClientException {
2447            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
2448                Reader reader = new InputStreamReader(conn.getInputStream());
2449                JSONObject json = (JSONObject) JSONValue.parse(reader);
2450                Metrics metrics = new Metrics(json);
2451                return metrics;
2452            }
2453            else if ((conn.getResponseCode() == HttpURLConnection.HTTP_UNAVAILABLE)) {
2454                // Use Instrumentation endpoint
2455                return null;
2456            }
2457            else {
2458                handleError(conn);
2459            }
2460            return null;
2461        }
2462    }
2463
2464    public class Metrics {
2465        private Map<String, Long> counters;
2466        private Map<String, Object> gauges;
2467        private Map<String, Timer> timers;
2468        private Map<String, Histogram> histograms;
2469
2470        @SuppressWarnings("unchecked")
2471        public Metrics(JSONObject json) {
2472            JSONObject jCounters = (JSONObject) json.get("counters");
2473            counters = new HashMap<String, Long>(jCounters.size());
2474            for (Object entO : jCounters.entrySet()) {
2475                Entry<String, JSONObject> ent = (Entry<String, JSONObject>) entO;
2476                counters.put(ent.getKey(), (Long)ent.getValue().get("count"));
2477            }
2478
2479            JSONObject jGuages = (JSONObject) json.get("gauges");
2480            gauges = new HashMap<String, Object>(jGuages.size());
2481            for (Object entO : jGuages.entrySet()) {
2482                Entry<String, JSONObject> ent = (Entry<String, JSONObject>) entO;
2483                gauges.put(ent.getKey(), ent.getValue().get("value"));
2484            }
2485
2486            JSONObject jTimers = (JSONObject) json.get("timers");
2487            timers = new HashMap<String, Timer>(jTimers.size());
2488            for (Object entO : jTimers.entrySet()) {
2489                Entry<String, JSONObject> ent = (Entry<String, JSONObject>) entO;
2490                timers.put(ent.getKey(), new Timer(ent.getValue()));
2491            }
2492
2493            JSONObject jHistograms = (JSONObject) json.get("histograms");
2494            histograms = new HashMap<String, Histogram>(jHistograms.size());
2495            for (Object entO : jHistograms.entrySet()) {
2496                Entry<String, JSONObject> ent = (Entry<String, JSONObject>) entO;
2497                histograms.put(ent.getKey(), new Histogram(ent.getValue()));
2498            }
2499        }
2500
2501        public Map<String, Long> getCounters() {
2502            return counters;
2503        }
2504
2505        public Map<String, Object> getGauges() {
2506            return gauges;
2507        }
2508
2509        public Map<String, Timer> getTimers() {
2510            return timers;
2511        }
2512
2513        public Map<String, Histogram> getHistograms() {
2514            return histograms;
2515        }
2516
2517        public class Timer extends Histogram {
2518            private double m15Rate;
2519            private double m5Rate;
2520            private double m1Rate;
2521            private double meanRate;
2522            private String durationUnits;
2523            private String rateUnits;
2524
2525            public Timer(JSONObject json) {
2526                super(json);
2527                m15Rate = Double.valueOf(json.get("m15_rate").toString());
2528                m5Rate = Double.valueOf(json.get("m5_rate").toString());
2529                m1Rate = Double.valueOf(json.get("m1_rate").toString());
2530                meanRate = Double.valueOf(json.get("mean_rate").toString());
2531                durationUnits = json.get("duration_units").toString();
2532                rateUnits = json.get("rate_units").toString();
2533            }
2534
2535            public double get15MinuteRate() {
2536                return m15Rate;
2537            }
2538
2539            public double get5MinuteRate() {
2540                return m5Rate;
2541            }
2542
2543            public double get1MinuteRate() {
2544                return m1Rate;
2545            }
2546
2547            public double getMeanRate() {
2548                return meanRate;
2549            }
2550
2551            public String getDurationUnits() {
2552                return durationUnits;
2553            }
2554
2555            public String getRateUnits() {
2556                return rateUnits;
2557            }
2558
2559            @Override
2560            public String toString() {
2561                StringBuilder sb = new StringBuilder(super.toString());
2562                sb.append("\n\t15 minute rate : ").append(m15Rate);
2563                sb.append("\n\t5 minute rate : ").append(m5Rate);
2564                sb.append("\n\t1 minute rate : ").append(m15Rate);
2565                sb.append("\n\tmean rate : ").append(meanRate);
2566                sb.append("\n\tduration units : ").append(durationUnits);
2567                sb.append("\n\trate units : ").append(rateUnits);
2568                return sb.toString();
2569            }
2570        }
2571
2572        public class Histogram {
2573            private double p999;
2574            private double p99;
2575            private double p98;
2576            private double p95;
2577            private double p75;
2578            private double p50;
2579            private double mean;
2580            private double min;
2581            private double max;
2582            private double stdDev;
2583            private long count;
2584
2585            public Histogram(JSONObject json) {
2586                p999 = Double.valueOf(json.get("p999").toString());
2587                p99 = Double.valueOf(json.get("p99").toString());
2588                p98 = Double.valueOf(json.get("p98").toString());
2589                p95 = Double.valueOf(json.get("p95").toString());
2590                p75 = Double.valueOf(json.get("p75").toString());
2591                p50 = Double.valueOf(json.get("p50").toString());
2592                mean = Double.valueOf(json.get("mean").toString());
2593                min = Double.valueOf(json.get("min").toString());
2594                max = Double.valueOf(json.get("max").toString());
2595                stdDev = Double.valueOf(json.get("stddev").toString());
2596                count = Long.valueOf(json.get("count").toString());
2597            }
2598
2599            public double get999thPercentile() {
2600                return p999;
2601            }
2602
2603            public double get99thPercentile() {
2604                return p99;
2605            }
2606
2607            public double get98thPercentile() {
2608                return p98;
2609            }
2610
2611            public double get95thPercentile() {
2612                return p95;
2613            }
2614
2615            public double get75thPercentile() {
2616                return p75;
2617            }
2618
2619            public double get50thPercentile() {
2620                return p50;
2621            }
2622
2623            public double getMean() {
2624                return mean;
2625            }
2626
2627            public double getMin() {
2628                return min;
2629            }
2630
2631            public double getMax() {
2632                return max;
2633            }
2634
2635            public double getStandardDeviation() {
2636                return stdDev;
2637            }
2638
2639            public long getCount() {
2640                return count;
2641            }
2642
2643            @Override
2644            public String toString() {
2645                StringBuilder sb = new StringBuilder();
2646                sb.append("\t999th percentile : ").append(p999);
2647                sb.append("\n\t99th percentile : ").append(p99);
2648                sb.append("\n\t98th percentile : ").append(p98);
2649                sb.append("\n\t95th percentile : ").append(p95);
2650                sb.append("\n\t75th percentile : ").append(p75);
2651                sb.append("\n\t50th percentile : ").append(p50);
2652                sb.append("\n\tmean : ").append(mean);
2653                sb.append("\n\tmax : ").append(max);
2654                sb.append("\n\tmin : ").append(min);
2655                sb.append("\n\tcount : ").append(count);
2656                sb.append("\n\tstandard deviation : ").append(stdDev);
2657                return sb.toString();
2658            }
2659        }
2660    }
2661
2662    /**
2663     * Return the Oozie metrics.  If null is returned, then try {@link #getInstrumentation()}.
2664     *
2665     * @return the Oozie metrics or null.
2666     * @throws OozieClientException throw if the metrics could not be retrieved.
2667     */
2668    public Metrics getMetrics() throws OozieClientException {
2669        return new GetMetrics().call();
2670    }
2671
2672    private class GetInstrumentation extends ClientCallable<Instrumentation> {
2673
2674        GetInstrumentation() {
2675            super("GET", RestConstants.ADMIN, RestConstants.ADMIN_INSTRUMENTATION_RESOURCE, prepareParams());
2676        }
2677
2678        @Override
2679        protected Instrumentation call(HttpURLConnection conn) throws IOException, OozieClientException {
2680            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
2681                Reader reader = new InputStreamReader(conn.getInputStream());
2682                JSONObject json = (JSONObject) JSONValue.parse(reader);
2683                Instrumentation instrumentation = new Instrumentation(json);
2684                return instrumentation;
2685            }
2686            else if ((conn.getResponseCode() == HttpURLConnection.HTTP_UNAVAILABLE)) {
2687                // Use Metrics endpoint
2688                return null;
2689            }
2690            else {
2691                handleError(conn);
2692            }
2693            return null;
2694        }
2695    }
2696
2697    public class Instrumentation {
2698        private Map<String, Long> counters;
2699        private Map<String, Object> variables;
2700        private Map<String, Double> samplers;
2701        private Map<String, Timer> timers;
2702
2703        public Instrumentation(JSONObject json) {
2704            JSONArray jCounters = (JSONArray) json.get("counters");
2705            counters = new HashMap<String, Long>(jCounters.size());
2706            for (Object groupO : jCounters) {
2707                JSONObject group = (JSONObject) groupO;
2708                String groupName = group.get("group").toString() + ".";
2709                JSONArray data = (JSONArray) group.get("data");
2710                for (Object datO : data) {
2711                    JSONObject dat = (JSONObject) datO;
2712                    counters.put(groupName + dat.get("name").toString(), Long.valueOf(dat.get("value").toString()));
2713                }
2714            }
2715
2716            JSONArray jVariables = (JSONArray) json.get("variables");
2717            variables = new HashMap<String, Object>(jVariables.size());
2718            for (Object groupO : jVariables) {
2719                JSONObject group = (JSONObject) groupO;
2720                String groupName = group.get("group").toString() + ".";
2721                JSONArray data = (JSONArray) group.get("data");
2722                for (Object datO : data) {
2723                    JSONObject dat = (JSONObject) datO;
2724                    variables.put(groupName + dat.get("name").toString(), dat.get("value"));
2725                }
2726            }
2727
2728            JSONArray jSamplers = (JSONArray) json.get("samplers");
2729            samplers = new HashMap<String, Double>(jSamplers.size());
2730            for (Object groupO : jSamplers) {
2731                JSONObject group = (JSONObject) groupO;
2732                String groupName = group.get("group").toString() + ".";
2733                JSONArray data = (JSONArray) group.get("data");
2734                for (Object datO : data) {
2735                    JSONObject dat = (JSONObject) datO;
2736                    samplers.put(groupName + dat.get("name").toString(), Double.valueOf(dat.get("value").toString()));
2737                }
2738            }
2739
2740            JSONArray jTimers = (JSONArray) json.get("timers");
2741            timers = new HashMap<String, Timer>(jTimers.size());
2742            for (Object groupO : jTimers) {
2743                JSONObject group = (JSONObject) groupO;
2744                String groupName = group.get("group").toString() + ".";
2745                JSONArray data = (JSONArray) group.get("data");
2746                for (Object datO : data) {
2747                    JSONObject dat = (JSONObject) datO;
2748                    timers.put(groupName + dat.get("name").toString(), new Timer(dat));
2749                }
2750            }
2751        }
2752
2753        public class Timer {
2754            private double ownTimeStdDev;
2755            private long ownTimeAvg;
2756            private long ownMaxTime;
2757            private long ownMinTime;
2758            private double totalTimeStdDev;
2759            private long totalTimeAvg;
2760            private long totalMaxTime;
2761            private long totalMinTime;
2762            private long ticks;
2763
2764            public Timer(JSONObject json) {
2765                ownTimeStdDev = Double.valueOf(json.get("ownTimeStdDev").toString());
2766                ownTimeAvg = Long.valueOf(json.get("ownTimeAvg").toString());
2767                ownMaxTime = Long.valueOf(json.get("ownMaxTime").toString());
2768                ownMinTime = Long.valueOf(json.get("ownMinTime").toString());
2769                totalTimeStdDev = Double.valueOf(json.get("totalTimeStdDev").toString());
2770                totalTimeAvg = Long.valueOf(json.get("totalTimeAvg").toString());
2771                totalMaxTime = Long.valueOf(json.get("totalMaxTime").toString());
2772                totalMinTime = Long.valueOf(json.get("totalMinTime").toString());
2773                ticks = Long.valueOf(json.get("ticks").toString());
2774            }
2775
2776            public double getOwnTimeStandardDeviation() {
2777                return ownTimeStdDev;
2778            }
2779
2780            public long getOwnTimeAverage() {
2781                return ownTimeAvg;
2782            }
2783
2784            public long getOwnMaxTime() {
2785                return ownMaxTime;
2786            }
2787
2788            public long getOwnMinTime() {
2789                return ownMinTime;
2790            }
2791
2792            public double getTotalTimeStandardDeviation() {
2793                return totalTimeStdDev;
2794            }
2795
2796            public long getTotalTimeAverage() {
2797                return totalTimeAvg;
2798            }
2799
2800            public long getTotalMaxTime() {
2801                return totalMaxTime;
2802            }
2803
2804            public long getTotalMinTime() {
2805                return totalMinTime;
2806            }
2807
2808            public long getTicks() {
2809                return ticks;
2810            }
2811
2812            @Override
2813            public String toString() {
2814                StringBuilder sb = new StringBuilder();
2815                sb.append("\town time standard deviation : ").append(ownTimeStdDev);
2816                sb.append("\n\town average time : ").append(ownTimeAvg);
2817                sb.append("\n\town max time : ").append(ownMaxTime);
2818                sb.append("\n\town min time : ").append(ownMinTime);
2819                sb.append("\n\ttotal time standard deviation : ").append(totalTimeStdDev);
2820                sb.append("\n\ttotal average time : ").append(totalTimeAvg);
2821                sb.append("\n\ttotal max time : ").append(totalMaxTime);
2822                sb.append("\n\ttotal min time : ").append(totalMinTime);
2823                sb.append("\n\tticks : ").append(ticks);
2824                return sb.toString();
2825            }
2826        }
2827
2828        public Map<String, Long> getCounters() {
2829            return counters;
2830        }
2831
2832        public Map<String, Object> getVariables() {
2833            return variables;
2834        }
2835
2836        public Map<String, Double> getSamplers() {
2837            return samplers;
2838        }
2839
2840        public Map<String, Timer> getTimers() {
2841            return timers;
2842        }
2843    }
2844
2845    /**
2846     * Return the Oozie instrumentation.  If null is returned, then try {@link #getMetrics()}.
2847     *
2848     * @return the Oozie intstrumentation or null.
2849     * @throws OozieClientException throw if the intstrumentation could not be retrieved.
2850     */
2851    public Instrumentation getInstrumentation() throws OozieClientException {
2852        return new GetInstrumentation().call();
2853    }
2854
2855    /**
2856     * Check if the string is not null or not empty.
2857     *
2858     * @param str
2859     * @param name
2860     * @return string
2861     */
2862    public static String notEmpty(String str, String name) {
2863        if (str == null) {
2864            throw new IllegalArgumentException(name + " cannot be null");
2865        }
2866        if (str.length() == 0) {
2867            throw new IllegalArgumentException(name + " cannot be empty");
2868        }
2869        return str;
2870    }
2871
2872    /**
2873     * Check if the object is not null.
2874     *
2875     * @param <T>
2876     * @param obj
2877     * @param name
2878     * @return string
2879     */
2880    public static <T> T notNull(T obj, String name) {
2881        if (obj == null) {
2882            throw new IllegalArgumentException(name + " cannot be null");
2883        }
2884        return obj;
2885    }
2886
2887}