This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.client;
019
020 import java.io.BufferedReader;
021 import java.io.IOException;
022 import java.io.InputStream;
023 import java.io.InputStreamReader;
024 import java.io.OutputStream;
025 import java.io.PrintStream;
026 import java.io.Reader;
027 import java.net.HttpURLConnection;
028 import java.net.URL;
029 import java.net.URLEncoder;
030 import java.util.ArrayList;
031 import java.util.Collections;
032 import java.util.HashMap;
033 import java.util.Iterator;
034 import java.util.LinkedHashMap;
035 import java.util.List;
036 import java.util.Map;
037 import java.util.Properties;
038 import java.util.concurrent.Callable;
039
040 import javax.xml.parsers.DocumentBuilderFactory;
041 import javax.xml.transform.Transformer;
042 import javax.xml.transform.TransformerFactory;
043 import javax.xml.transform.dom.DOMSource;
044 import javax.xml.transform.stream.StreamResult;
045
046 import org.apache.oozie.BuildInfo;
047 import org.apache.oozie.client.rest.JsonTags;
048 import org.apache.oozie.client.rest.JsonToBean;
049 import org.apache.oozie.client.rest.RestConstants;
050 import org.json.simple.JSONArray;
051 import org.json.simple.JSONObject;
052 import org.json.simple.JSONValue;
053 import org.w3c.dom.Document;
054 import org.w3c.dom.Element;
055
056 /**
057 * Client API to submit and manage Oozie workflow jobs against an Oozie intance.
058 * <p/>
059 * This class is thread safe.
060 * <p/>
061 * Syntax for filter for the {@link #getJobsInfo(String)} {@link #getJobsInfo(String, int, int)} methods:
062 * <code>[NAME=VALUE][;NAME=VALUE]*</code>.
063 * <p/>
064 * Valid filter names are:
065 * <p/>
066 * <ul/>
067 * <li>name: the workflow application name from the workflow definition.</li>
068 * <li>user: the user that submitted the job.</li>
069 * <li>group: the group for the job.</li>
070 * <li>status: the status of the job.</li>
071 * </ul>
072 * <p/>
073 * The query will do an AND among all the filter names. The query will do an OR among all the filter values for the same
074 * name. Multiple values must be specified as different name value pairs.
075 */
076 public class OozieClient {
077
078 public static final long WS_PROTOCOL_VERSION_0 = 0;
079
080 public static final long WS_PROTOCOL_VERSION_1 = 1;
081
082 public static final long WS_PROTOCOL_VERSION = 2; // pointer to current version
083
084 public static final String USER_NAME = "user.name";
085
086 @Deprecated
087 public static final String GROUP_NAME = "group.name";
088
089 public static final String JOB_ACL = "oozie.job.acl";
090
091 public static final String APP_PATH = "oozie.wf.application.path";
092
093 public static final String COORDINATOR_APP_PATH = "oozie.coord.application.path";
094
095 public static final String BUNDLE_APP_PATH = "oozie.bundle.application.path";
096
097 public static final String BUNDLE_ID = "oozie.bundle.id";
098
099 public static final String EXTERNAL_ID = "oozie.wf.external.id";
100
101 public static final String WORKFLOW_NOTIFICATION_URL = "oozie.wf.workflow.notification.url";
102
103 public static final String ACTION_NOTIFICATION_URL = "oozie.wf.action.notification.url";
104
105 public static final String COORD_ACTION_NOTIFICATION_URL = "oozie.coord.action.notification.url";
106
107 public static final String RERUN_SKIP_NODES = "oozie.wf.rerun.skip.nodes";
108
109 public static final String RERUN_FAIL_NODES = "oozie.wf.rerun.failnodes";
110
111 public static final String LOG_TOKEN = "oozie.wf.log.token";
112
113 public static final String ACTION_MAX_RETRIES = "oozie.wf.action.max.retries";
114
115 public static final String ACTION_RETRY_INTERVAL = "oozie.wf.action.retry.interval";
116
117 public static final String FILTER_USER = "user";
118
119 public static final String FILTER_GROUP = "group";
120
121 public static final String FILTER_NAME = "name";
122
123 public static final String FILTER_STATUS = "status";
124
125 public static final String FILTER_FREQUENCY = "frequency";
126
127 public static final String FILTER_ID = "id";
128
129 public static final String FILTER_UNIT = "unit";
130
131 public static final String FILTER_JOBID = "jobid";
132
133 public static final String FILTER_APPNAME = "appname";
134
135 public static final String FILTER_SLA_APPNAME = "app_name";
136
137 public static final String FILTER_SLA_ID = "id";
138
139 public static final String FILTER_SLA_PARENT_ID = "parent_id";
140
141 public static final String FILTER_SLA_NOMINAL_START = "nominal_start";
142
143 public static final String FILTER_SLA_NOMINAL_END = "nominal_end";
144
145 public static final String CHANGE_VALUE_ENDTIME = "endtime";
146
147 public static final String CHANGE_VALUE_PAUSETIME = "pausetime";
148
149 public static final String CHANGE_VALUE_CONCURRENCY = "concurrency";
150
151 public static final String LIBPATH = "oozie.libpath";
152
153 public static final String USE_SYSTEM_LIBPATH = "oozie.use.system.libpath";
154
155 public static final String OOZIE_SUSPEND_ON_NODES = "oozie.suspend.on.nodes";
156
157 public static enum SYSTEM_MODE {
158 NORMAL, NOWEBSERVICE, SAFEMODE
159 };
160
161 /**
162 * debugMode =0 means no debugging. > 0 means debugging on.
163 */
164 public int debugMode = 0;
165
166 private String baseUrl;
167 private String protocolUrl;
168 private boolean validatedVersion = false;
169 private JSONArray supportedVersions;
170 private final Map<String, String> headers = new HashMap<String, String>();
171
172 private static ThreadLocal<String> USER_NAME_TL = new ThreadLocal<String>();
173
174 /**
175 * Allows to impersonate other users in the Oozie server. The current user
176 * must be configured as a proxyuser in Oozie.
177 * <p/>
178 * IMPORTANT: impersonation happens only with Oozie client requests done within
179 * doAs() calls.
180 *
181 * @param userName user to impersonate.
182 * @param callable callable with {@link OozieClient} calls impersonating the specified user.
183 * @return any response returned by the {@link Callable#call()} method.
184 * @throws Exception thrown by the {@link Callable#call()} method.
185 */
186 public static <T> T doAs(String userName, Callable<T> callable) throws Exception {
187 notEmpty(userName, "userName");
188 notNull(callable, "callable");
189 try {
190 USER_NAME_TL.set(userName);
191 return callable.call();
192 }
193 finally {
194 USER_NAME_TL.remove();
195 }
196 }
197
198 protected OozieClient() {
199 }
200
201 /**
202 * Create a Workflow client instance.
203 *
204 * @param oozieUrl URL of the Oozie instance it will interact with.
205 */
206 public OozieClient(String oozieUrl) {
207 this.baseUrl = notEmpty(oozieUrl, "oozieUrl");
208 if (!this.baseUrl.endsWith("/")) {
209 this.baseUrl += "/";
210 }
211 }
212
213 /**
214 * Return the Oozie URL of the workflow client instance.
215 * <p/>
216 * This URL is the base URL fo the Oozie system, with not protocol versioning.
217 *
218 * @return the Oozie URL of the workflow client instance.
219 */
220 public String getOozieUrl() {
221 return baseUrl;
222 }
223
224 /**
225 * Return the Oozie URL used by the client and server for WS communications.
226 * <p/>
227 * This URL is the original URL plus the versioning element path.
228 *
229 * @return the Oozie URL used by the client and server for communication.
230 * @throws OozieClientException thrown in the client and the server are not protocol compatible.
231 */
232 public String getProtocolUrl() throws OozieClientException {
233 validateWSVersion();
234 return protocolUrl;
235 }
236
237 /**
238 * @return current debug Mode
239 */
240 public int getDebugMode() {
241 return debugMode;
242 }
243
244 /**
245 * Set debug mode.
246 *
247 * @param debugMode : 0 means no debugging. > 0 means debugging
248 */
249 public void setDebugMode(int debugMode) {
250 this.debugMode = debugMode;
251 }
252
253 private String getBaseURLForVersion(long protocolVersion) throws OozieClientException {
254 try {
255 if (supportedVersions == null) {
256 supportedVersions = getSupportedProtocolVersions();
257 }
258 if (supportedVersions == null) {
259 throw new OozieClientException("HTTP error", "no response message");
260 }
261 if (supportedVersions.contains(protocolVersion)) {
262 return baseUrl + "v" + protocolVersion + "/";
263 }
264 else {
265 throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, "Protocol version "
266 + protocolVersion + " is not supported");
267 }
268 }
269 catch (IOException e) {
270 throw new OozieClientException(OozieClientException.IO_ERROR, e);
271 }
272 }
273
274 /**
275 * Validate that the Oozie client and server instances are protocol compatible.
276 *
277 * @throws OozieClientException thrown in the client and the server are not protocol compatible.
278 */
279 public synchronized void validateWSVersion() throws OozieClientException {
280 if (!validatedVersion) {
281 try {
282 supportedVersions = getSupportedProtocolVersions();
283 if (supportedVersions == null) {
284 throw new OozieClientException("HTTP error", "no response message");
285 }
286 if (!supportedVersions.contains(WS_PROTOCOL_VERSION)
287 && !supportedVersions.contains(WS_PROTOCOL_VERSION_1)
288 && !supportedVersions.contains(WS_PROTOCOL_VERSION_0)) {
289 StringBuilder msg = new StringBuilder();
290 msg.append("Supported version [").append(WS_PROTOCOL_VERSION)
291 .append("] or less, Unsupported versions[");
292 String separator = "";
293 for (Object version : supportedVersions) {
294 msg.append(separator).append(version);
295 }
296 msg.append("]");
297 throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, msg.toString());
298 }
299 if (supportedVersions.contains(WS_PROTOCOL_VERSION)) {
300 protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION + "/";
301 }
302 else if (supportedVersions.contains(WS_PROTOCOL_VERSION_1)) {
303 protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION_1 + "/";
304 }
305 else {
306 if (supportedVersions.contains(WS_PROTOCOL_VERSION_0)) {
307 protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION_0 + "/";
308 }
309 }
310 }
311 catch (IOException ex) {
312 throw new OozieClientException(OozieClientException.IO_ERROR, ex);
313 }
314 validatedVersion = true;
315 }
316 }
317
318 private JSONArray getSupportedProtocolVersions() throws IOException, OozieClientException {
319 JSONArray versions = null;
320 URL url = new URL(baseUrl + RestConstants.VERSIONS);
321 HttpURLConnection conn = createConnection(url, "GET");
322 if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) {
323 versions = (JSONArray) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
324 }
325 else {
326 handleError(conn);
327 }
328 return versions;
329 }
330
331 /**
332 * Create an empty configuration with just the {@link #USER_NAME} set to the JVM user name.
333 *
334 * @return an empty configuration.
335 */
336 public Properties createConfiguration() {
337 Properties conf = new Properties();
338 String userName = USER_NAME_TL.get();
339 if (userName == null) {
340 userName = System.getProperty("user.name");
341 }
342 conf.setProperty(USER_NAME, userName);
343 return conf;
344 }
345
346 /**
347 * Set a HTTP header to be used in the WS requests by the workflow instance.
348 *
349 * @param name header name.
350 * @param value header value.
351 */
352 public void setHeader(String name, String value) {
353 headers.put(notEmpty(name, "name"), notNull(value, "value"));
354 }
355
356 /**
357 * Get the value of a set HTTP header from the workflow instance.
358 *
359 * @param name header name.
360 * @return header value, <code>null</code> if not set.
361 */
362 public String getHeader(String name) {
363 return headers.get(notEmpty(name, "name"));
364 }
365
366 /**
367 * Get the set HTTP header
368 *
369 * @return map of header key and value
370 */
371 public Map<String, String> getHeaders() {
372 return headers;
373 }
374
375 /**
376 * Remove a HTTP header from the workflow client instance.
377 *
378 * @param name header name.
379 */
380 public void removeHeader(String name) {
381 headers.remove(notEmpty(name, "name"));
382 }
383
384 /**
385 * Return an iterator with all the header names set in the workflow instance.
386 *
387 * @return header names.
388 */
389 public Iterator<String> getHeaderNames() {
390 return Collections.unmodifiableMap(headers).keySet().iterator();
391 }
392
393 private URL createURL(Long protocolVersion, String collection, String resource, Map<String, String> parameters)
394 throws IOException, OozieClientException {
395 validateWSVersion();
396 StringBuilder sb = new StringBuilder();
397 if (protocolVersion == null) {
398 sb.append(protocolUrl);
399 }
400 else {
401 sb.append(getBaseURLForVersion(protocolVersion));
402 }
403 sb.append(collection);
404 if (resource != null && resource.length() > 0) {
405 sb.append("/").append(resource);
406 }
407 if (parameters.size() > 0) {
408 String separator = "?";
409 for (Map.Entry<String, String> param : parameters.entrySet()) {
410 if (param.getValue() != null) {
411 sb.append(separator).append(URLEncoder.encode(param.getKey(), "UTF-8")).append("=").append(
412 URLEncoder.encode(param.getValue(), "UTF-8"));
413 separator = "&";
414 }
415 }
416 }
417 return new URL(sb.toString());
418 }
419
420 private boolean validateCommand(String url) {
421 {
422 if (protocolUrl.contains(baseUrl + "v0")) {
423 if (url.contains("dryrun") || url.contains("jobtype=c") || url.contains("systemmode")) {
424 return false;
425 }
426 }
427 }
428 return true;
429 }
430
431 /**
432 * Create http connection to oozie server.
433 *
434 * @param url
435 * @param method
436 * @return connection
437 * @throws IOException
438 * @throws OozieClientException
439 */
440 protected HttpURLConnection createConnection(URL url, String method) throws IOException, OozieClientException {
441 HttpURLConnection conn = (HttpURLConnection) url.openConnection();
442 conn.setRequestMethod(method);
443 if (method.equals("POST") || method.equals("PUT")) {
444 conn.setDoOutput(true);
445 }
446 for (Map.Entry<String, String> header : headers.entrySet()) {
447 conn.setRequestProperty(header.getKey(), header.getValue());
448 }
449 return conn;
450 }
451
452 protected abstract class ClientCallable<T> implements Callable<T> {
453 private final String method;
454 private final String collection;
455 private final String resource;
456 private final Map<String, String> params;
457 private final Long protocolVersion;
458
459 public ClientCallable(String method, String collection, String resource, Map<String, String> params) {
460 this(method, null, collection, resource, params);
461 }
462
463 public ClientCallable(String method, Long protocolVersion, String collection, String resource, Map<String, String> params) {
464 this.method = method;
465 this.protocolVersion = protocolVersion;
466 this.collection = collection;
467 this.resource = resource;
468 this.params = params;
469 }
470
471 public T call() throws OozieClientException {
472 try {
473 URL url = createURL(protocolVersion, collection, resource, params);
474 if (validateCommand(url.toString())) {
475 if (getDebugMode() > 0) {
476 System.out.println(method + " " + url);
477 }
478 HttpURLConnection conn = createConnection(url, method);
479 return call(conn);
480 }
481 else {
482 System.out.println("Option not supported in target server. Supported only on Oozie-2.0 or greater."
483 + " Use 'oozie help' for details");
484 throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, new Exception());
485 }
486 }
487 catch (IOException ex) {
488 throw new OozieClientException(OozieClientException.IO_ERROR, ex);
489 }
490
491 }
492
493 protected abstract T call(HttpURLConnection conn) throws IOException, OozieClientException;
494 }
495
496 static void handleError(HttpURLConnection conn) throws IOException, OozieClientException {
497 int status = conn.getResponseCode();
498 String error = conn.getHeaderField(RestConstants.OOZIE_ERROR_CODE);
499 String message = conn.getHeaderField(RestConstants.OOZIE_ERROR_MESSAGE);
500
501 if (error == null) {
502 error = "HTTP error code: " + status;
503 }
504
505 if (message == null) {
506 message = conn.getResponseMessage();
507 }
508 throw new OozieClientException(error, message);
509 }
510
511 static Map<String, String> prepareParams(String... params) {
512 Map<String, String> map = new LinkedHashMap<String, String>();
513 for (int i = 0; i < params.length; i = i + 2) {
514 map.put(params[i], params[i + 1]);
515 }
516 String doAsUserName = USER_NAME_TL.get();
517 if (doAsUserName != null) {
518 map.put(RestConstants.DO_AS_PARAM, doAsUserName);
519 }
520 return map;
521 }
522
523 public void writeToXml(Properties props, OutputStream out) throws IOException {
524 try {
525 Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument();
526 Element conf = doc.createElement("configuration");
527 doc.appendChild(conf);
528 conf.appendChild(doc.createTextNode("\n"));
529 for (String name : props.stringPropertyNames()) { // Properties whose key or value is not of type String are omitted.
530 String value = props.getProperty(name);
531 Element propNode = doc.createElement("property");
532 conf.appendChild(propNode);
533
534 Element nameNode = doc.createElement("name");
535 nameNode.appendChild(doc.createTextNode(name.trim()));
536 propNode.appendChild(nameNode);
537
538 Element valueNode = doc.createElement("value");
539 valueNode.appendChild(doc.createTextNode(value.trim()));
540 propNode.appendChild(valueNode);
541
542 conf.appendChild(doc.createTextNode("\n"));
543 }
544
545 DOMSource source = new DOMSource(doc);
546 StreamResult result = new StreamResult(out);
547 TransformerFactory transFactory = TransformerFactory.newInstance();
548 Transformer transformer = transFactory.newTransformer();
549 transformer.transform(source, result);
550 if (getDebugMode() > 0) {
551 result = new StreamResult(System.out);
552 transformer.transform(source, result);
553 System.out.println();
554 }
555 }
556 catch (Exception e) {
557 throw new IOException(e);
558 }
559 }
560
561 private class JobSubmit extends ClientCallable<String> {
562 private final Properties conf;
563
564 JobSubmit(Properties conf, boolean start) {
565 super("POST", RestConstants.JOBS, "", (start) ? prepareParams(RestConstants.ACTION_PARAM,
566 RestConstants.JOB_ACTION_START) : prepareParams());
567 this.conf = notNull(conf, "conf");
568 }
569
570 JobSubmit(String jobId, Properties conf) {
571 super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
572 RestConstants.JOB_ACTION_RERUN));
573 this.conf = notNull(conf, "conf");
574 }
575
576 public JobSubmit(Properties conf, String jobActionDryrun) {
577 super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.ACTION_PARAM,
578 RestConstants.JOB_ACTION_DRYRUN));
579 this.conf = notNull(conf, "conf");
580 }
581
582 @Override
583 protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
584 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
585 writeToXml(conf, conn.getOutputStream());
586 if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) {
587 JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
588 return (String) json.get(JsonTags.JOB_ID);
589 }
590 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
591 handleError(conn);
592 }
593 return null;
594 }
595 }
596
597 /**
598 * Submit a workflow job.
599 *
600 * @param conf job configuration.
601 * @return the job Id.
602 * @throws OozieClientException thrown if the job could not be submitted.
603 */
604 public String submit(Properties conf) throws OozieClientException {
605 return (new JobSubmit(conf, false)).call();
606 }
607
608 private class JobAction extends ClientCallable<Void> {
609
610 JobAction(String jobId, String action) {
611 super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, action));
612 }
613
614 JobAction(String jobId, String action, String params) {
615 super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, action,
616 RestConstants.JOB_CHANGE_VALUE, params));
617 }
618
619 @Override
620 protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
621 if (!(conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
622 handleError(conn);
623 }
624 return null;
625 }
626 }
627
628 /**
629 * dryrun for a given job
630 *
631 * @param conf Job configuration.
632 */
633 public String dryrun(Properties conf) throws OozieClientException {
634 return new JobSubmit(conf, RestConstants.JOB_ACTION_DRYRUN).call();
635 }
636
637 /**
638 * Start a workflow job.
639 *
640 * @param jobId job Id.
641 * @throws OozieClientException thrown if the job could not be started.
642 */
643 public void start(String jobId) throws OozieClientException {
644 new JobAction(jobId, RestConstants.JOB_ACTION_START).call();
645 }
646
647 /**
648 * Submit and start a workflow job.
649 *
650 * @param conf job configuration.
651 * @return the job Id.
652 * @throws OozieClientException thrown if the job could not be submitted.
653 */
654 public String run(Properties conf) throws OozieClientException {
655 return (new JobSubmit(conf, true)).call();
656 }
657
658 /**
659 * Rerun a workflow job.
660 *
661 * @param jobId job Id to rerun.
662 * @param conf configuration information for the rerun.
663 * @throws OozieClientException thrown if the job could not be started.
664 */
665 public void reRun(String jobId, Properties conf) throws OozieClientException {
666 new JobSubmit(jobId, conf).call();
667 }
668
669 /**
670 * Suspend a workflow job.
671 *
672 * @param jobId job Id.
673 * @throws OozieClientException thrown if the job could not be suspended.
674 */
675 public void suspend(String jobId) throws OozieClientException {
676 new JobAction(jobId, RestConstants.JOB_ACTION_SUSPEND).call();
677 }
678
679 /**
680 * Resume a workflow job.
681 *
682 * @param jobId job Id.
683 * @throws OozieClientException thrown if the job could not be resume.
684 */
685 public void resume(String jobId) throws OozieClientException {
686 new JobAction(jobId, RestConstants.JOB_ACTION_RESUME).call();
687 }
688
689 /**
690 * Kill a workflow job.
691 *
692 * @param jobId job Id.
693 * @throws OozieClientException thrown if the job could not be killed.
694 */
695 public void kill(String jobId) throws OozieClientException {
696 new JobAction(jobId, RestConstants.JOB_ACTION_KILL).call();
697 }
698
699 /**
700 * Change a coordinator job.
701 *
702 * @param jobId job Id.
703 * @param changeValue change value.
704 * @throws OozieClientException thrown if the job could not be changed.
705 */
706 public void change(String jobId, String changeValue) throws OozieClientException {
707 new JobAction(jobId, RestConstants.JOB_ACTION_CHANGE, changeValue).call();
708 }
709
710 private class JobInfo extends ClientCallable<WorkflowJob> {
711
712 JobInfo(String jobId, int start, int len) {
713 super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
714 RestConstants.JOB_SHOW_INFO, RestConstants.OFFSET_PARAM, Integer.toString(start),
715 RestConstants.LEN_PARAM, Integer.toString(len)));
716 }
717
718 @Override
719 protected WorkflowJob call(HttpURLConnection conn) throws IOException, OozieClientException {
720 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
721 Reader reader = new InputStreamReader(conn.getInputStream());
722 JSONObject json = (JSONObject) JSONValue.parse(reader);
723 return JsonToBean.createWorkflowJob(json);
724 }
725 else {
726 handleError(conn);
727 }
728 return null;
729 }
730 }
731
732
733 private class JMSInfo extends ClientCallable<JMSConnectionInfo> {
734
735 JMSInfo() {
736 super("GET", RestConstants.ADMIN, RestConstants.ADMIN_JMS_INFO, prepareParams());
737 }
738
739 protected JMSConnectionInfo call(HttpURLConnection conn) throws IOException, OozieClientException {
740 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
741 Reader reader = new InputStreamReader(conn.getInputStream());
742 JSONObject json = (JSONObject) JSONValue.parse(reader);
743 return JsonToBean.createJMSConnectionInfo(json);
744 }
745 else {
746 handleError(conn);
747 }
748 return null;
749 }
750 }
751
752 private class WorkflowActionInfo extends ClientCallable<WorkflowAction> {
753 WorkflowActionInfo(String actionId) {
754 super("GET", RestConstants.JOB, notEmpty(actionId, "id"), prepareParams(RestConstants.JOB_SHOW_PARAM,
755 RestConstants.JOB_SHOW_INFO));
756 }
757
758 @Override
759 protected WorkflowAction call(HttpURLConnection conn) throws IOException, OozieClientException {
760 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
761 Reader reader = new InputStreamReader(conn.getInputStream());
762 JSONObject json = (JSONObject) JSONValue.parse(reader);
763 return JsonToBean.createWorkflowAction(json);
764 }
765 else {
766 handleError(conn);
767 }
768 return null;
769 }
770 }
771
772 /**
773 * Get the info of a workflow job.
774 *
775 * @param jobId job Id.
776 * @return the job info.
777 * @throws OozieClientException thrown if the job info could not be retrieved.
778 */
779 public WorkflowJob getJobInfo(String jobId) throws OozieClientException {
780 return getJobInfo(jobId, 0, 0);
781 }
782
783 /**
784 * Get the JMS Connection info
785 * @return JMSConnectionInfo object
786 * @throws OozieClientException
787 */
788 public JMSConnectionInfo getJMSConnectionInfo() throws OozieClientException {
789 return new JMSInfo().call();
790 }
791
792 /**
793 * Get the info of a workflow job and subset actions.
794 *
795 * @param jobId job Id.
796 * @param start starting index in the list of actions belonging to the job
797 * @param len number of actions to be returned
798 * @return the job info.
799 * @throws OozieClientException thrown if the job info could not be retrieved.
800 */
801 public WorkflowJob getJobInfo(String jobId, int start, int len) throws OozieClientException {
802 return new JobInfo(jobId, start, len).call();
803 }
804
805 /**
806 * Get the info of a workflow action.
807 *
808 * @param actionId Id.
809 * @return the workflow action info.
810 * @throws OozieClientException thrown if the job info could not be retrieved.
811 */
812 public WorkflowAction getWorkflowActionInfo(String actionId) throws OozieClientException {
813 return new WorkflowActionInfo(actionId).call();
814 }
815
816 /**
817 * Get the log of a workflow job.
818 *
819 * @param jobId job Id.
820 * @return the job log.
821 * @throws OozieClientException thrown if the job info could not be retrieved.
822 */
823 public String getJobLog(String jobId) throws OozieClientException {
824 return new JobLog(jobId).call();
825 }
826
827 /**
828 * Get the log of a job.
829 *
830 * @param jobId job Id.
831 * @param logRetrievalType Based on which filter criteria the log is retrieved
832 * @param logRetrievalScope Value for the retrieval type
833 * @param ps Printstream of command line interface
834 * @throws OozieClientException thrown if the job info could not be retrieved.
835 */
836 public void getJobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps)
837 throws OozieClientException {
838 new JobLog(jobId, logRetrievalType, logRetrievalScope, ps).call();
839 }
840
841 private class JobLog extends JobMetadata {
842 JobLog(String jobId) {
843 super(jobId, RestConstants.JOB_SHOW_LOG);
844 }
845
846 JobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps) {
847 super(jobId, logRetrievalType, logRetrievalScope, RestConstants.JOB_SHOW_LOG, ps);
848 }
849 }
850
851 /**
852 * Gets the JMS topic name for a particular job
853 * @param jobId given jobId
854 * @return the JMS topic name
855 * @throws OozieClientException
856 */
857 public String getJMSTopicName(String jobId) throws OozieClientException {
858 return new JMSTopic(jobId).call();
859 }
860
861 private class JMSTopic extends ClientCallable<String> {
862
863 JMSTopic(String jobId) {
864 super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
865 RestConstants.JOB_SHOW_JMS_TOPIC));
866 }
867
868 protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
869 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
870 Reader reader = new InputStreamReader(conn.getInputStream());
871 JSONObject json = (JSONObject) JSONValue.parse(reader);
872 return (String) json.get(JsonTags.JMS_TOPIC_NAME);
873 }
874 else {
875 handleError(conn);
876 }
877 return null;
878 }
879 }
880
881 /**
882 * Get the definition of a workflow job.
883 *
884 * @param jobId job Id.
885 * @return the job log.
886 * @throws OozieClientException thrown if the job info could not be retrieved.
887 */
888 public String getJobDefinition(String jobId) throws OozieClientException {
889 return new JobDefinition(jobId).call();
890 }
891
892 private class JobDefinition extends JobMetadata {
893
894 JobDefinition(String jobId) {
895 super(jobId, RestConstants.JOB_SHOW_DEFINITION);
896 }
897 }
898
899 private class JobMetadata extends ClientCallable<String> {
900 PrintStream printStream;
901
902 JobMetadata(String jobId, String metaType) {
903 super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
904 metaType));
905 }
906
907 JobMetadata(String jobId, String logRetrievalType, String logRetrievalScope, String metaType, PrintStream ps) {
908 super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
909 metaType, RestConstants.JOB_LOG_TYPE_PARAM, logRetrievalType, RestConstants.JOB_LOG_SCOPE_PARAM,
910 logRetrievalScope));
911 printStream = ps;
912 }
913
914 @Override
915 protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
916 String returnVal = null;
917 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
918 InputStream is = conn.getInputStream();
919 InputStreamReader isr = new InputStreamReader(is);
920 try {
921 if (printStream != null) {
922 sendToOutputStream(isr, -1);
923 }
924 else {
925 returnVal = getReaderAsString(isr, -1);
926 }
927 }
928 finally {
929 isr.close();
930 }
931 }
932 else {
933 handleError(conn);
934 }
935 return returnVal;
936 }
937
938 /**
939 * Output the log to command line interface
940 *
941 * @param reader reader to read into a string.
942 * @param maxLen max content length allowed, if -1 there is no limit.
943 * @param ps Printstream of command line interface
944 * @throws IOException
945 */
946 private void sendToOutputStream(Reader reader, int maxLen) throws IOException {
947 if (reader == null) {
948 throw new IllegalArgumentException("reader cannot be null");
949 }
950 StringBuilder sb = new StringBuilder();
951 char[] buffer = new char[2048];
952 int read;
953 int count = 0;
954 int noOfCharstoFlush = 1024;
955 while ((read = reader.read(buffer)) > -1) {
956 count += read;
957 if ((maxLen > -1) && (count > maxLen)) {
958 break;
959 }
960 sb.append(buffer, 0, read);
961 if (sb.length() > noOfCharstoFlush) {
962 printStream.print(sb.toString());
963 sb = new StringBuilder("");
964 }
965 }
966 printStream.print(sb.toString());
967 }
968
969 /**
970 * Return a reader as string.
971 * <p/>
972 *
973 * @param reader reader to read into a string.
974 * @param maxLen max content length allowed, if -1 there is no limit.
975 * @return the reader content.
976 * @throws IOException thrown if the resource could not be read.
977 */
978 private String getReaderAsString(Reader reader, int maxLen) throws IOException {
979 if (reader == null) {
980 throw new IllegalArgumentException("reader cannot be null");
981 }
982 StringBuffer sb = new StringBuffer();
983 char[] buffer = new char[2048];
984 int read;
985 int count = 0;
986 while ((read = reader.read(buffer)) > -1) {
987 count += read;
988
989 // read up to maxLen chars;
990 if ((maxLen > -1) && (count > maxLen)) {
991 break;
992 }
993 sb.append(buffer, 0, read);
994 }
995 reader.close();
996 return sb.toString();
997 }
998 }
999
1000 private class CoordJobInfo extends ClientCallable<CoordinatorJob> {
1001
1002 CoordJobInfo(String jobId, String filter, int start, int len) {
1003 super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
1004 RestConstants.JOB_SHOW_INFO, RestConstants.JOB_FILTER_PARAM, filter, RestConstants.OFFSET_PARAM, Integer.toString(start),
1005 RestConstants.LEN_PARAM, Integer.toString(len)));
1006 }
1007
1008 @Override
1009 protected CoordinatorJob call(HttpURLConnection conn) throws IOException, OozieClientException {
1010 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1011 Reader reader = new InputStreamReader(conn.getInputStream());
1012 JSONObject json = (JSONObject) JSONValue.parse(reader);
1013 return JsonToBean.createCoordinatorJob(json);
1014 }
1015 else {
1016 handleError(conn);
1017 }
1018 return null;
1019 }
1020 }
1021
1022 private class BundleJobInfo extends ClientCallable<BundleJob> {
1023
1024 BundleJobInfo(String jobId) {
1025 super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
1026 RestConstants.JOB_SHOW_INFO));
1027 }
1028
1029 @Override
1030 protected BundleJob call(HttpURLConnection conn) throws IOException, OozieClientException {
1031 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1032 Reader reader = new InputStreamReader(conn.getInputStream());
1033 JSONObject json = (JSONObject) JSONValue.parse(reader);
1034 return JsonToBean.createBundleJob(json);
1035 }
1036 else {
1037 handleError(conn);
1038 }
1039 return null;
1040 }
1041 }
1042
1043 private class CoordActionInfo extends ClientCallable<CoordinatorAction> {
1044 CoordActionInfo(String actionId) {
1045 super("GET", RestConstants.JOB, notEmpty(actionId, "id"), prepareParams(RestConstants.JOB_SHOW_PARAM,
1046 RestConstants.JOB_SHOW_INFO));
1047 }
1048
1049 @Override
1050 protected CoordinatorAction call(HttpURLConnection conn) throws IOException, OozieClientException {
1051 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1052 Reader reader = new InputStreamReader(conn.getInputStream());
1053 JSONObject json = (JSONObject) JSONValue.parse(reader);
1054 return JsonToBean.createCoordinatorAction(json);
1055 }
1056 else {
1057 handleError(conn);
1058 }
1059 return null;
1060 }
1061 }
1062
1063 /**
1064 * Get the info of a bundle job.
1065 *
1066 * @param jobId job Id.
1067 * @return the job info.
1068 * @throws OozieClientException thrown if the job info could not be retrieved.
1069 */
1070 public BundleJob getBundleJobInfo(String jobId) throws OozieClientException {
1071 return new BundleJobInfo(jobId).call();
1072 }
1073
1074 /**
1075 * Get the info of a coordinator job.
1076 *
1077 * @param jobId job Id.
1078 * @return the job info.
1079 * @throws OozieClientException thrown if the job info could not be retrieved.
1080 */
1081 public CoordinatorJob getCoordJobInfo(String jobId) throws OozieClientException {
1082 return new CoordJobInfo(jobId, null, -1, -1).call();
1083 }
1084
1085 /**
1086 * Get the info of a coordinator job and subset actions.
1087 *
1088 * @param jobId job Id.
1089 * @param filter filter the status filter
1090 * @param start starting index in the list of actions belonging to the job
1091 * @param len number of actions to be returned
1092 * @return the job info.
1093 * @throws OozieClientException thrown if the job info could not be retrieved.
1094 */
1095 public CoordinatorJob getCoordJobInfo(String jobId, String filter, int start, int len) throws OozieClientException {
1096 return new CoordJobInfo(jobId, filter, start, len).call();
1097 }
1098
1099 /**
1100 * Get the info of a coordinator action.
1101 *
1102 * @param actionId Id.
1103 * @return the coordinator action info.
1104 * @throws OozieClientException thrown if the job info could not be retrieved.
1105 */
1106 public CoordinatorAction getCoordActionInfo(String actionId) throws OozieClientException {
1107 return new CoordActionInfo(actionId).call();
1108 }
1109
1110 private class JobsStatus extends ClientCallable<List<WorkflowJob>> {
1111
1112 JobsStatus(String filter, int start, int len) {
1113 super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter,
1114 RestConstants.JOBTYPE_PARAM, "wf", RestConstants.OFFSET_PARAM, Integer.toString(start),
1115 RestConstants.LEN_PARAM, Integer.toString(len)));
1116 }
1117
1118 @Override
1119 @SuppressWarnings("unchecked")
1120 protected List<WorkflowJob> call(HttpURLConnection conn) throws IOException, OozieClientException {
1121 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1122 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1123 Reader reader = new InputStreamReader(conn.getInputStream());
1124 JSONObject json = (JSONObject) JSONValue.parse(reader);
1125 JSONArray workflows = (JSONArray) json.get(JsonTags.WORKFLOWS_JOBS);
1126 if (workflows == null) {
1127 workflows = new JSONArray();
1128 }
1129 return JsonToBean.createWorkflowJobList(workflows);
1130 }
1131 else {
1132 handleError(conn);
1133 }
1134 return null;
1135 }
1136 }
1137
1138 private class CoordJobsStatus extends ClientCallable<List<CoordinatorJob>> {
1139
1140 CoordJobsStatus(String filter, int start, int len) {
1141 super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter,
1142 RestConstants.JOBTYPE_PARAM, "coord", RestConstants.OFFSET_PARAM, Integer.toString(start),
1143 RestConstants.LEN_PARAM, Integer.toString(len)));
1144 }
1145
1146 @Override
1147 protected List<CoordinatorJob> call(HttpURLConnection conn) throws IOException, OozieClientException {
1148 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1149 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1150 Reader reader = new InputStreamReader(conn.getInputStream());
1151 JSONObject json = (JSONObject) JSONValue.parse(reader);
1152 JSONArray jobs = (JSONArray) json.get(JsonTags.COORDINATOR_JOBS);
1153 if (jobs == null) {
1154 jobs = new JSONArray();
1155 }
1156 return JsonToBean.createCoordinatorJobList(jobs);
1157 }
1158 else {
1159 handleError(conn);
1160 }
1161 return null;
1162 }
1163 }
1164
1165 private class BundleJobsStatus extends ClientCallable<List<BundleJob>> {
1166
1167 BundleJobsStatus(String filter, int start, int len) {
1168 super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter,
1169 RestConstants.JOBTYPE_PARAM, "bundle", RestConstants.OFFSET_PARAM, Integer.toString(start),
1170 RestConstants.LEN_PARAM, Integer.toString(len)));
1171 }
1172
1173 @Override
1174 protected List<BundleJob> call(HttpURLConnection conn) throws IOException, OozieClientException {
1175 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1176 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1177 Reader reader = new InputStreamReader(conn.getInputStream());
1178 JSONObject json = (JSONObject) JSONValue.parse(reader);
1179 JSONArray jobs = (JSONArray) json.get(JsonTags.BUNDLE_JOBS);
1180 if (jobs == null) {
1181 jobs = new JSONArray();
1182 }
1183 return JsonToBean.createBundleJobList(jobs);
1184 }
1185 else {
1186 handleError(conn);
1187 }
1188 return null;
1189 }
1190 }
1191
1192 private class BulkResponseStatus extends ClientCallable<List<BulkResponse>> {
1193
1194 BulkResponseStatus(String filter, int start, int len) {
1195 super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_BULK_PARAM, filter,
1196 RestConstants.OFFSET_PARAM, Integer.toString(start), RestConstants.LEN_PARAM, Integer.toString(len)));
1197 }
1198
1199 @Override
1200 protected List<BulkResponse> call(HttpURLConnection conn) throws IOException, OozieClientException {
1201 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1202 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1203 Reader reader = new InputStreamReader(conn.getInputStream());
1204 JSONObject json = (JSONObject) JSONValue.parse(reader);
1205 JSONArray results = (JSONArray) json.get(JsonTags.BULK_RESPONSES);
1206 if (results == null) {
1207 results = new JSONArray();
1208 }
1209 return JsonToBean.createBulkResponseList(results);
1210 }
1211 else {
1212 handleError(conn);
1213 }
1214 return null;
1215 }
1216 }
1217
1218 private class CoordRerun extends ClientCallable<List<CoordinatorAction>> {
1219
1220 CoordRerun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) {
1221 super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
1222 RestConstants.JOB_COORD_ACTION_RERUN, RestConstants.JOB_COORD_RERUN_TYPE_PARAM, rerunType,
1223 RestConstants.JOB_COORD_RERUN_SCOPE_PARAM, scope, RestConstants.JOB_COORD_RERUN_REFRESH_PARAM,
1224 Boolean.toString(refresh), RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM, Boolean
1225 .toString(noCleanup)));
1226 }
1227
1228 @Override
1229 protected List<CoordinatorAction> call(HttpURLConnection conn) throws IOException, OozieClientException {
1230 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1231 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1232 Reader reader = new InputStreamReader(conn.getInputStream());
1233 JSONObject json = (JSONObject) JSONValue.parse(reader);
1234 JSONArray coordActions = (JSONArray) json.get(JsonTags.COORDINATOR_ACTIONS);
1235 return JsonToBean.createCoordinatorActionList(coordActions);
1236 }
1237 else {
1238 handleError(conn);
1239 }
1240 return null;
1241 }
1242 }
1243
1244 private class BundleRerun extends ClientCallable<Void> {
1245
1246 BundleRerun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) {
1247 super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
1248 RestConstants.JOB_BUNDLE_ACTION_RERUN, RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM,
1249 coordScope, RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM, dateScope,
1250 RestConstants.JOB_COORD_RERUN_REFRESH_PARAM, Boolean.toString(refresh),
1251 RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM, Boolean.toString(noCleanup)));
1252 }
1253
1254 @Override
1255 protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
1256 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1257 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1258 return null;
1259 }
1260 else {
1261 handleError(conn);
1262 }
1263 return null;
1264 }
1265 }
1266
1267 /**
1268 * Rerun coordinator actions.
1269 *
1270 * @param jobId coordinator jobId
1271 * @param rerunType rerun type 'date' if -date is used, 'action-id' if -action is used
1272 * @param scope rerun scope for date or actionIds
1273 * @param refresh true if -refresh is given in command option
1274 * @param noCleanup true if -nocleanup is given in command option
1275 * @throws OozieClientException
1276 */
1277 public List<CoordinatorAction> reRunCoord(String jobId, String rerunType, String scope, boolean refresh,
1278 boolean noCleanup) throws OozieClientException {
1279 return new CoordRerun(jobId, rerunType, scope, refresh, noCleanup).call();
1280 }
1281
1282 /**
1283 * Rerun bundle coordinators.
1284 *
1285 * @param jobId bundle jobId
1286 * @param coordScope rerun scope for coordinator jobs
1287 * @param dateScope rerun scope for date
1288 * @param refresh true if -refresh is given in command option
1289 * @param noCleanup true if -nocleanup is given in command option
1290 * @throws OozieClientException
1291 */
1292 public Void reRunBundle(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup)
1293 throws OozieClientException {
1294 return new BundleRerun(jobId, coordScope, dateScope, refresh, noCleanup).call();
1295 }
1296
1297 /**
1298 * Return the info of the workflow jobs that match the filter.
1299 *
1300 * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
1301 * @param start jobs offset, base 1.
1302 * @param len number of jobs to return.
1303 * @return a list with the workflow jobs info, without node details.
1304 * @throws OozieClientException thrown if the jobs info could not be retrieved.
1305 */
1306 public List<WorkflowJob> getJobsInfo(String filter, int start, int len) throws OozieClientException {
1307 return new JobsStatus(filter, start, len).call();
1308 }
1309
1310 /**
1311 * Return the info of the workflow jobs that match the filter.
1312 * <p/>
1313 * It returns the first 100 jobs that match the filter.
1314 *
1315 * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
1316 * @return a list with the workflow jobs info, without node details.
1317 * @throws OozieClientException thrown if the jobs info could not be retrieved.
1318 */
1319 public List<WorkflowJob> getJobsInfo(String filter) throws OozieClientException {
1320 return getJobsInfo(filter, 1, 50);
1321 }
1322
1323 /**
1324 * Print sla info about coordinator and workflow jobs and actions.
1325 *
1326 * @param start starting offset
1327 * @param len number of results
1328 * @throws OozieClientException
1329 */
1330 public void getSlaInfo(int start, int len, String filter) throws OozieClientException {
1331 new SlaInfo(start, len, filter).call();
1332 }
1333
1334 private class SlaInfo extends ClientCallable<Void> {
1335
1336 SlaInfo(int start, int len, String filter) {
1337 super("GET", WS_PROTOCOL_VERSION_1, RestConstants.SLA, "", prepareParams(RestConstants.SLA_GT_SEQUENCE_ID,
1338 Integer.toString(start), RestConstants.MAX_EVENTS, Integer.toString(len),
1339 RestConstants.JOBS_FILTER_PARAM, filter));
1340 }
1341
1342 @Override
1343 @SuppressWarnings("unchecked")
1344 protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
1345 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1346 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1347 BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream()));
1348 String line = null;
1349 while ((line = br.readLine()) != null) {
1350 System.out.println(line);
1351 }
1352 }
1353 else {
1354 handleError(conn);
1355 }
1356 return null;
1357 }
1358 }
1359
1360 private class JobIdAction extends ClientCallable<String> {
1361
1362 JobIdAction(String externalId) {
1363 super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, "wf",
1364 RestConstants.JOBS_EXTERNAL_ID_PARAM, externalId));
1365 }
1366
1367 @Override
1368 @SuppressWarnings("unchecked")
1369 protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
1370 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1371 Reader reader = new InputStreamReader(conn.getInputStream());
1372 JSONObject json = (JSONObject) JSONValue.parse(reader);
1373 return (String) json.get(JsonTags.JOB_ID);
1374 }
1375 else {
1376 handleError(conn);
1377 }
1378 return null;
1379 }
1380 }
1381
1382 /**
1383 * Return the workflow job Id for an external Id.
1384 * <p/>
1385 * The external Id must have provided at job creation time.
1386 *
1387 * @param externalId external Id given at job creation time.
1388 * @return the workflow job Id for an external Id, <code>null</code> if none.
1389 * @throws OozieClientException thrown if the operation could not be done.
1390 */
1391 public String getJobId(String externalId) throws OozieClientException {
1392 return new JobIdAction(externalId).call();
1393 }
1394
1395 private class SetSystemMode extends ClientCallable<Void> {
1396
1397 public SetSystemMode(SYSTEM_MODE status) {
1398 super("PUT", RestConstants.ADMIN, RestConstants.ADMIN_STATUS_RESOURCE, prepareParams(
1399 RestConstants.ADMIN_SYSTEM_MODE_PARAM, status + ""));
1400 }
1401
1402 @Override
1403 public Void call(HttpURLConnection conn) throws IOException, OozieClientException {
1404 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
1405 handleError(conn);
1406 }
1407 return null;
1408 }
1409 }
1410
1411 /**
1412 * Enable or disable safe mode. Used by OozieCLI. In safe mode, Oozie would not accept any commands except status
1413 * command to change and view the safe mode status.
1414 *
1415 * @param status true to enable safe mode, false to disable safe mode.
1416 * @throws OozieClientException if it fails to set the safe mode status.
1417 */
1418 public void setSystemMode(SYSTEM_MODE status) throws OozieClientException {
1419 new SetSystemMode(status).call();
1420 }
1421
1422 private class GetSystemMode extends ClientCallable<SYSTEM_MODE> {
1423
1424 GetSystemMode() {
1425 super("GET", RestConstants.ADMIN, RestConstants.ADMIN_STATUS_RESOURCE, prepareParams());
1426 }
1427
1428 @Override
1429 protected SYSTEM_MODE call(HttpURLConnection conn) throws IOException, OozieClientException {
1430 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1431 Reader reader = new InputStreamReader(conn.getInputStream());
1432 JSONObject json = (JSONObject) JSONValue.parse(reader);
1433 return SYSTEM_MODE.valueOf((String) json.get(JsonTags.OOZIE_SYSTEM_MODE));
1434 }
1435 else {
1436 handleError(conn);
1437 }
1438 return SYSTEM_MODE.NORMAL;
1439 }
1440 }
1441
1442 /**
1443 * Returns if Oozie is in safe mode or not.
1444 *
1445 * @return true if safe mode is ON<br>
1446 * false if safe mode is OFF
1447 * @throws OozieClientException throw if it could not obtain the safe mode status.
1448 */
1449 /*
1450 * public boolean isInSafeMode() throws OozieClientException { return new GetSafeMode().call(); }
1451 */
1452 public SYSTEM_MODE getSystemMode() throws OozieClientException {
1453 return new GetSystemMode().call();
1454 }
1455
1456 private class GetBuildVersion extends ClientCallable<String> {
1457
1458 GetBuildVersion() {
1459 super("GET", RestConstants.ADMIN, RestConstants.ADMIN_BUILD_VERSION_RESOURCE, prepareParams());
1460 }
1461
1462 @Override
1463 protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
1464 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1465 Reader reader = new InputStreamReader(conn.getInputStream());
1466 JSONObject json = (JSONObject) JSONValue.parse(reader);
1467 return (String) json.get(JsonTags.BUILD_VERSION);
1468 }
1469 else {
1470 handleError(conn);
1471 }
1472 return null;
1473 }
1474 }
1475
1476 /**
1477 * Return the Oozie server build version.
1478 *
1479 * @return the Oozie server build version.
1480 * @throws OozieClientException throw if it the server build version could not be retrieved.
1481 */
1482 public String getServerBuildVersion() throws OozieClientException {
1483 return new GetBuildVersion().call();
1484 }
1485
1486 /**
1487 * Return the Oozie client build version.
1488 *
1489 * @return the Oozie client build version.
1490 */
1491 public String getClientBuildVersion() {
1492 return BuildInfo.getBuildInfo().getProperty(BuildInfo.BUILD_VERSION);
1493 }
1494
1495 /**
1496 * Return the info of the coordinator jobs that match the filter.
1497 *
1498 * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
1499 * @param start jobs offset, base 1.
1500 * @param len number of jobs to return.
1501 * @return a list with the coordinator jobs info
1502 * @throws OozieClientException thrown if the jobs info could not be retrieved.
1503 */
1504 public List<CoordinatorJob> getCoordJobsInfo(String filter, int start, int len) throws OozieClientException {
1505 return new CoordJobsStatus(filter, start, len).call();
1506 }
1507
1508 /**
1509 * Return the info of the bundle jobs that match the filter.
1510 *
1511 * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
1512 * @param start jobs offset, base 1.
1513 * @param len number of jobs to return.
1514 * @return a list with the bundle jobs info
1515 * @throws OozieClientException thrown if the jobs info could not be retrieved.
1516 */
1517 public List<BundleJob> getBundleJobsInfo(String filter, int start, int len) throws OozieClientException {
1518 return new BundleJobsStatus(filter, start, len).call();
1519 }
1520
1521 public List<BulkResponse> getBulkInfo(String filter, int start, int len) throws OozieClientException {
1522 return new BulkResponseStatus(filter, start, len).call();
1523 }
1524
1525 private class GetQueueDump extends ClientCallable<List<String>> {
1526 GetQueueDump() {
1527 super("GET", RestConstants.ADMIN, RestConstants.ADMIN_QUEUE_DUMP_RESOURCE, prepareParams());
1528 }
1529
1530 @Override
1531 protected List<String> call(HttpURLConnection conn) throws IOException, OozieClientException {
1532 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1533 Reader reader = new InputStreamReader(conn.getInputStream());
1534 JSONObject json = (JSONObject) JSONValue.parse(reader);
1535 JSONArray queueDumpArray = (JSONArray) json.get(JsonTags.QUEUE_DUMP);
1536
1537 List<String> list = new ArrayList<String>();
1538 list.add("[Server Queue Dump]:");
1539 for (Object o : queueDumpArray) {
1540 JSONObject entry = (JSONObject) o;
1541 if (entry.get(JsonTags.CALLABLE_DUMP) != null) {
1542 String value = (String) entry.get(JsonTags.CALLABLE_DUMP);
1543 list.add(value);
1544 }
1545 }
1546 if (queueDumpArray.size() == 0) {
1547 list.add("Queue dump is null!");
1548 }
1549
1550 list.add("******************************************");
1551 list.add("[Server Uniqueness Map Dump]:");
1552
1553 JSONArray uniqueDumpArray = (JSONArray) json.get(JsonTags.UNIQUE_MAP_DUMP);
1554 for (Object o : uniqueDumpArray) {
1555 JSONObject entry = (JSONObject) o;
1556 if (entry.get(JsonTags.UNIQUE_ENTRY_DUMP) != null) {
1557 String value = (String) entry.get(JsonTags.UNIQUE_ENTRY_DUMP);
1558 list.add(value);
1559 }
1560 }
1561 if (uniqueDumpArray.size() == 0) {
1562 list.add("Uniqueness dump is null!");
1563 }
1564 return list;
1565 }
1566 else {
1567 handleError(conn);
1568 }
1569 return null;
1570 }
1571 }
1572
1573 /**
1574 * Return the Oozie queue's commands' dump
1575 *
1576 * @return the list of strings of callable identification in queue
1577 * @throws OozieClientException throw if it the queue dump could not be retrieved.
1578 */
1579 public List<String> getQueueDump() throws OozieClientException {
1580 return new GetQueueDump().call();
1581 }
1582
1583 /**
1584 * Check if the string is not null or not empty.
1585 *
1586 * @param str
1587 * @param name
1588 * @return string
1589 */
1590 public static String notEmpty(String str, String name) {
1591 if (str == null) {
1592 throw new IllegalArgumentException(name + " cannot be null");
1593 }
1594 if (str.length() == 0) {
1595 throw new IllegalArgumentException(name + " cannot be empty");
1596 }
1597 return str;
1598 }
1599
1600 /**
1601 * Check if the object is not null.
1602 *
1603 * @param <T>
1604 * @param obj
1605 * @param name
1606 * @return string
1607 */
1608 public static <T> T notNull(T obj, String name) {
1609 if (obj == null) {
1610 throw new IllegalArgumentException(name + " cannot be null");
1611 }
1612 return obj;
1613 }
1614
1615 }