001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.client; 019 020 import java.io.BufferedReader; 021 import java.io.IOException; 022 import java.io.InputStream; 023 import java.io.InputStreamReader; 024 import java.io.OutputStream; 025 import java.io.PrintStream; 026 import java.io.Reader; 027 import java.net.HttpURLConnection; 028 import java.net.URL; 029 import java.net.URLEncoder; 030 import java.util.ArrayList; 031 import java.util.Collections; 032 import java.util.HashMap; 033 import java.util.Iterator; 034 import java.util.LinkedHashMap; 035 import java.util.List; 036 import java.util.Map; 037 import java.util.Properties; 038 import java.util.concurrent.Callable; 039 040 import javax.xml.parsers.DocumentBuilderFactory; 041 import javax.xml.transform.Transformer; 042 import javax.xml.transform.TransformerFactory; 043 import javax.xml.transform.dom.DOMSource; 044 import javax.xml.transform.stream.StreamResult; 045 046 import org.apache.oozie.BuildInfo; 047 import org.apache.oozie.client.rest.JsonTags; 048 import org.apache.oozie.client.rest.JsonToBean; 049 import org.apache.oozie.client.rest.RestConstants; 050 import org.json.simple.JSONArray; 051 import org.json.simple.JSONObject; 052 import org.json.simple.JSONValue; 053 import org.w3c.dom.Document; 054 import org.w3c.dom.Element; 055 056 /** 057 * Client API to submit and manage Oozie workflow jobs against an Oozie intance. 058 * <p/> 059 * This class is thread safe. 060 * <p/> 061 * Syntax for filter for the {@link #getJobsInfo(String)} {@link #getJobsInfo(String, int, int)} methods: 062 * <code>[NAME=VALUE][;NAME=VALUE]*</code>. 063 * <p/> 064 * Valid filter names are: 065 * <p/> 066 * <ul/> 067 * <li>name: the workflow application name from the workflow definition.</li> 068 * <li>user: the user that submitted the job.</li> 069 * <li>group: the group for the job.</li> 070 * <li>status: the status of the job.</li> 071 * </ul> 072 * <p/> 073 * The query will do an AND among all the filter names. The query will do an OR among all the filter values for the same 074 * name. Multiple values must be specified as different name value pairs. 075 */ 076 public class OozieClient { 077 078 public static final long WS_PROTOCOL_VERSION_0 = 0; 079 080 public static final long WS_PROTOCOL_VERSION_1 = 1; 081 082 public static final long WS_PROTOCOL_VERSION = 2; // pointer to current version 083 084 public static final String USER_NAME = "user.name"; 085 086 @Deprecated 087 public static final String GROUP_NAME = "group.name"; 088 089 public static final String JOB_ACL = "oozie.job.acl"; 090 091 public static final String APP_PATH = "oozie.wf.application.path"; 092 093 public static final String COORDINATOR_APP_PATH = "oozie.coord.application.path"; 094 095 public static final String BUNDLE_APP_PATH = "oozie.bundle.application.path"; 096 097 public static final String BUNDLE_ID = "oozie.bundle.id"; 098 099 public static final String EXTERNAL_ID = "oozie.wf.external.id"; 100 101 public static final String WORKFLOW_NOTIFICATION_URL = "oozie.wf.workflow.notification.url"; 102 103 public static final String ACTION_NOTIFICATION_URL = "oozie.wf.action.notification.url"; 104 105 public static final String COORD_ACTION_NOTIFICATION_URL = "oozie.coord.action.notification.url"; 106 107 public static final String RERUN_SKIP_NODES = "oozie.wf.rerun.skip.nodes"; 108 109 public static final String RERUN_FAIL_NODES = "oozie.wf.rerun.failnodes"; 110 111 public static final String LOG_TOKEN = "oozie.wf.log.token"; 112 113 public static final String ACTION_MAX_RETRIES = "oozie.wf.action.max.retries"; 114 115 public static final String ACTION_RETRY_INTERVAL = "oozie.wf.action.retry.interval"; 116 117 public static final String FILTER_USER = "user"; 118 119 public static final String FILTER_GROUP = "group"; 120 121 public static final String FILTER_NAME = "name"; 122 123 public static final String FILTER_STATUS = "status"; 124 125 public static final String FILTER_FREQUENCY = "frequency"; 126 127 public static final String FILTER_ID = "id"; 128 129 public static final String FILTER_UNIT = "unit"; 130 131 public static final String FILTER_JOBID = "jobid"; 132 133 public static final String FILTER_APPNAME = "appname"; 134 135 public static final String FILTER_SLA_APPNAME = "app_name"; 136 137 public static final String FILTER_SLA_ID = "id"; 138 139 public static final String FILTER_SLA_PARENT_ID = "parent_id"; 140 141 public static final String FILTER_SLA_NOMINAL_START = "nominal_start"; 142 143 public static final String FILTER_SLA_NOMINAL_END = "nominal_end"; 144 145 public static final String CHANGE_VALUE_ENDTIME = "endtime"; 146 147 public static final String CHANGE_VALUE_PAUSETIME = "pausetime"; 148 149 public static final String CHANGE_VALUE_CONCURRENCY = "concurrency"; 150 151 public static final String LIBPATH = "oozie.libpath"; 152 153 public static final String USE_SYSTEM_LIBPATH = "oozie.use.system.libpath"; 154 155 public static final String OOZIE_SUSPEND_ON_NODES = "oozie.suspend.on.nodes"; 156 157 public static enum SYSTEM_MODE { 158 NORMAL, NOWEBSERVICE, SAFEMODE 159 }; 160 161 /** 162 * debugMode =0 means no debugging. > 0 means debugging on. 163 */ 164 public int debugMode = 0; 165 166 private String baseUrl; 167 private String protocolUrl; 168 private boolean validatedVersion = false; 169 private JSONArray supportedVersions; 170 private final Map<String, String> headers = new HashMap<String, String>(); 171 172 private static ThreadLocal<String> USER_NAME_TL = new ThreadLocal<String>(); 173 174 /** 175 * Allows to impersonate other users in the Oozie server. The current user 176 * must be configured as a proxyuser in Oozie. 177 * <p/> 178 * IMPORTANT: impersonation happens only with Oozie client requests done within 179 * doAs() calls. 180 * 181 * @param userName user to impersonate. 182 * @param callable callable with {@link OozieClient} calls impersonating the specified user. 183 * @return any response returned by the {@link Callable#call()} method. 184 * @throws Exception thrown by the {@link Callable#call()} method. 185 */ 186 public static <T> T doAs(String userName, Callable<T> callable) throws Exception { 187 notEmpty(userName, "userName"); 188 notNull(callable, "callable"); 189 try { 190 USER_NAME_TL.set(userName); 191 return callable.call(); 192 } 193 finally { 194 USER_NAME_TL.remove(); 195 } 196 } 197 198 protected OozieClient() { 199 } 200 201 /** 202 * Create a Workflow client instance. 203 * 204 * @param oozieUrl URL of the Oozie instance it will interact with. 205 */ 206 public OozieClient(String oozieUrl) { 207 this.baseUrl = notEmpty(oozieUrl, "oozieUrl"); 208 if (!this.baseUrl.endsWith("/")) { 209 this.baseUrl += "/"; 210 } 211 } 212 213 /** 214 * Return the Oozie URL of the workflow client instance. 215 * <p/> 216 * This URL is the base URL fo the Oozie system, with not protocol versioning. 217 * 218 * @return the Oozie URL of the workflow client instance. 219 */ 220 public String getOozieUrl() { 221 return baseUrl; 222 } 223 224 /** 225 * Return the Oozie URL used by the client and server for WS communications. 226 * <p/> 227 * This URL is the original URL plus the versioning element path. 228 * 229 * @return the Oozie URL used by the client and server for communication. 230 * @throws OozieClientException thrown in the client and the server are not protocol compatible. 231 */ 232 public String getProtocolUrl() throws OozieClientException { 233 validateWSVersion(); 234 return protocolUrl; 235 } 236 237 /** 238 * @return current debug Mode 239 */ 240 public int getDebugMode() { 241 return debugMode; 242 } 243 244 /** 245 * Set debug mode. 246 * 247 * @param debugMode : 0 means no debugging. > 0 means debugging 248 */ 249 public void setDebugMode(int debugMode) { 250 this.debugMode = debugMode; 251 } 252 253 private String getBaseURLForVersion(long protocolVersion) throws OozieClientException { 254 try { 255 if (supportedVersions == null) { 256 supportedVersions = getSupportedProtocolVersions(); 257 } 258 if (supportedVersions == null) { 259 throw new OozieClientException("HTTP error", "no response message"); 260 } 261 if (supportedVersions.contains(protocolVersion)) { 262 return baseUrl + "v" + protocolVersion + "/"; 263 } 264 else { 265 throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, "Protocol version " 266 + protocolVersion + " is not supported"); 267 } 268 } 269 catch (IOException e) { 270 throw new OozieClientException(OozieClientException.IO_ERROR, e); 271 } 272 } 273 274 /** 275 * Validate that the Oozie client and server instances are protocol compatible. 276 * 277 * @throws OozieClientException thrown in the client and the server are not protocol compatible. 278 */ 279 public synchronized void validateWSVersion() throws OozieClientException { 280 if (!validatedVersion) { 281 try { 282 supportedVersions = getSupportedProtocolVersions(); 283 if (supportedVersions == null) { 284 throw new OozieClientException("HTTP error", "no response message"); 285 } 286 if (!supportedVersions.contains(WS_PROTOCOL_VERSION) 287 && !supportedVersions.contains(WS_PROTOCOL_VERSION_1) 288 && !supportedVersions.contains(WS_PROTOCOL_VERSION_0)) { 289 StringBuilder msg = new StringBuilder(); 290 msg.append("Supported version [").append(WS_PROTOCOL_VERSION) 291 .append("] or less, Unsupported versions["); 292 String separator = ""; 293 for (Object version : supportedVersions) { 294 msg.append(separator).append(version); 295 } 296 msg.append("]"); 297 throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, msg.toString()); 298 } 299 if (supportedVersions.contains(WS_PROTOCOL_VERSION)) { 300 protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION + "/"; 301 } 302 else if (supportedVersions.contains(WS_PROTOCOL_VERSION_1)) { 303 protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION_1 + "/"; 304 } 305 else { 306 if (supportedVersions.contains(WS_PROTOCOL_VERSION_0)) { 307 protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION_0 + "/"; 308 } 309 } 310 } 311 catch (IOException ex) { 312 throw new OozieClientException(OozieClientException.IO_ERROR, ex); 313 } 314 validatedVersion = true; 315 } 316 } 317 318 private JSONArray getSupportedProtocolVersions() throws IOException, OozieClientException { 319 JSONArray versions = null; 320 URL url = new URL(baseUrl + RestConstants.VERSIONS); 321 HttpURLConnection conn = createConnection(url, "GET"); 322 if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) { 323 versions = (JSONArray) JSONValue.parse(new InputStreamReader(conn.getInputStream())); 324 } 325 else { 326 handleError(conn); 327 } 328 return versions; 329 } 330 331 /** 332 * Create an empty configuration with just the {@link #USER_NAME} set to the JVM user name. 333 * 334 * @return an empty configuration. 335 */ 336 public Properties createConfiguration() { 337 Properties conf = new Properties(); 338 String userName = USER_NAME_TL.get(); 339 if (userName == null) { 340 userName = System.getProperty("user.name"); 341 } 342 conf.setProperty(USER_NAME, userName); 343 return conf; 344 } 345 346 /** 347 * Set a HTTP header to be used in the WS requests by the workflow instance. 348 * 349 * @param name header name. 350 * @param value header value. 351 */ 352 public void setHeader(String name, String value) { 353 headers.put(notEmpty(name, "name"), notNull(value, "value")); 354 } 355 356 /** 357 * Get the value of a set HTTP header from the workflow instance. 358 * 359 * @param name header name. 360 * @return header value, <code>null</code> if not set. 361 */ 362 public String getHeader(String name) { 363 return headers.get(notEmpty(name, "name")); 364 } 365 366 /** 367 * Get the set HTTP header 368 * 369 * @return map of header key and value 370 */ 371 public Map<String, String> getHeaders() { 372 return headers; 373 } 374 375 /** 376 * Remove a HTTP header from the workflow client instance. 377 * 378 * @param name header name. 379 */ 380 public void removeHeader(String name) { 381 headers.remove(notEmpty(name, "name")); 382 } 383 384 /** 385 * Return an iterator with all the header names set in the workflow instance. 386 * 387 * @return header names. 388 */ 389 public Iterator<String> getHeaderNames() { 390 return Collections.unmodifiableMap(headers).keySet().iterator(); 391 } 392 393 private URL createURL(Long protocolVersion, String collection, String resource, Map<String, String> parameters) 394 throws IOException, OozieClientException { 395 validateWSVersion(); 396 StringBuilder sb = new StringBuilder(); 397 if (protocolVersion == null) { 398 sb.append(protocolUrl); 399 } 400 else { 401 sb.append(getBaseURLForVersion(protocolVersion)); 402 } 403 sb.append(collection); 404 if (resource != null && resource.length() > 0) { 405 sb.append("/").append(resource); 406 } 407 if (parameters.size() > 0) { 408 String separator = "?"; 409 for (Map.Entry<String, String> param : parameters.entrySet()) { 410 if (param.getValue() != null) { 411 sb.append(separator).append(URLEncoder.encode(param.getKey(), "UTF-8")).append("=").append( 412 URLEncoder.encode(param.getValue(), "UTF-8")); 413 separator = "&"; 414 } 415 } 416 } 417 return new URL(sb.toString()); 418 } 419 420 private boolean validateCommand(String url) { 421 { 422 if (protocolUrl.contains(baseUrl + "v0")) { 423 if (url.contains("dryrun") || url.contains("jobtype=c") || url.contains("systemmode")) { 424 return false; 425 } 426 } 427 } 428 return true; 429 } 430 431 /** 432 * Create http connection to oozie server. 433 * 434 * @param url 435 * @param method 436 * @return connection 437 * @throws IOException 438 * @throws OozieClientException 439 */ 440 protected HttpURLConnection createConnection(URL url, String method) throws IOException, OozieClientException { 441 HttpURLConnection conn = (HttpURLConnection) url.openConnection(); 442 conn.setRequestMethod(method); 443 if (method.equals("POST") || method.equals("PUT")) { 444 conn.setDoOutput(true); 445 } 446 for (Map.Entry<String, String> header : headers.entrySet()) { 447 conn.setRequestProperty(header.getKey(), header.getValue()); 448 } 449 return conn; 450 } 451 452 protected abstract class ClientCallable<T> implements Callable<T> { 453 private final String method; 454 private final String collection; 455 private final String resource; 456 private final Map<String, String> params; 457 private final Long protocolVersion; 458 459 public ClientCallable(String method, String collection, String resource, Map<String, String> params) { 460 this(method, null, collection, resource, params); 461 } 462 463 public ClientCallable(String method, Long protocolVersion, String collection, String resource, Map<String, String> params) { 464 this.method = method; 465 this.protocolVersion = protocolVersion; 466 this.collection = collection; 467 this.resource = resource; 468 this.params = params; 469 } 470 471 public T call() throws OozieClientException { 472 try { 473 URL url = createURL(protocolVersion, collection, resource, params); 474 if (validateCommand(url.toString())) { 475 if (getDebugMode() > 0) { 476 System.out.println(method + " " + url); 477 } 478 HttpURLConnection conn = createConnection(url, method); 479 return call(conn); 480 } 481 else { 482 System.out.println("Option not supported in target server. Supported only on Oozie-2.0 or greater." 483 + " Use 'oozie help' for details"); 484 throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, new Exception()); 485 } 486 } 487 catch (IOException ex) { 488 throw new OozieClientException(OozieClientException.IO_ERROR, ex); 489 } 490 491 } 492 493 protected abstract T call(HttpURLConnection conn) throws IOException, OozieClientException; 494 } 495 496 static void handleError(HttpURLConnection conn) throws IOException, OozieClientException { 497 int status = conn.getResponseCode(); 498 String error = conn.getHeaderField(RestConstants.OOZIE_ERROR_CODE); 499 String message = conn.getHeaderField(RestConstants.OOZIE_ERROR_MESSAGE); 500 501 if (error == null) { 502 error = "HTTP error code: " + status; 503 } 504 505 if (message == null) { 506 message = conn.getResponseMessage(); 507 } 508 throw new OozieClientException(error, message); 509 } 510 511 static Map<String, String> prepareParams(String... params) { 512 Map<String, String> map = new LinkedHashMap<String, String>(); 513 for (int i = 0; i < params.length; i = i + 2) { 514 map.put(params[i], params[i + 1]); 515 } 516 String doAsUserName = USER_NAME_TL.get(); 517 if (doAsUserName != null) { 518 map.put(RestConstants.DO_AS_PARAM, doAsUserName); 519 } 520 return map; 521 } 522 523 public void writeToXml(Properties props, OutputStream out) throws IOException { 524 try { 525 Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument(); 526 Element conf = doc.createElement("configuration"); 527 doc.appendChild(conf); 528 conf.appendChild(doc.createTextNode("\n")); 529 for (String name : props.stringPropertyNames()) { // Properties whose key or value is not of type String are omitted. 530 String value = props.getProperty(name); 531 Element propNode = doc.createElement("property"); 532 conf.appendChild(propNode); 533 534 Element nameNode = doc.createElement("name"); 535 nameNode.appendChild(doc.createTextNode(name.trim())); 536 propNode.appendChild(nameNode); 537 538 Element valueNode = doc.createElement("value"); 539 valueNode.appendChild(doc.createTextNode(value.trim())); 540 propNode.appendChild(valueNode); 541 542 conf.appendChild(doc.createTextNode("\n")); 543 } 544 545 DOMSource source = new DOMSource(doc); 546 StreamResult result = new StreamResult(out); 547 TransformerFactory transFactory = TransformerFactory.newInstance(); 548 Transformer transformer = transFactory.newTransformer(); 549 transformer.transform(source, result); 550 if (getDebugMode() > 0) { 551 result = new StreamResult(System.out); 552 transformer.transform(source, result); 553 System.out.println(); 554 } 555 } 556 catch (Exception e) { 557 throw new IOException(e); 558 } 559 } 560 561 private class JobSubmit extends ClientCallable<String> { 562 private final Properties conf; 563 564 JobSubmit(Properties conf, boolean start) { 565 super("POST", RestConstants.JOBS, "", (start) ? prepareParams(RestConstants.ACTION_PARAM, 566 RestConstants.JOB_ACTION_START) : prepareParams()); 567 this.conf = notNull(conf, "conf"); 568 } 569 570 JobSubmit(String jobId, Properties conf) { 571 super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, 572 RestConstants.JOB_ACTION_RERUN)); 573 this.conf = notNull(conf, "conf"); 574 } 575 576 public JobSubmit(Properties conf, String jobActionDryrun) { 577 super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.ACTION_PARAM, 578 RestConstants.JOB_ACTION_DRYRUN)); 579 this.conf = notNull(conf, "conf"); 580 } 581 582 @Override 583 protected String call(HttpURLConnection conn) throws IOException, OozieClientException { 584 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 585 writeToXml(conf, conn.getOutputStream()); 586 if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) { 587 JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream())); 588 return (String) json.get(JsonTags.JOB_ID); 589 } 590 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { 591 handleError(conn); 592 } 593 return null; 594 } 595 } 596 597 /** 598 * Submit a workflow job. 599 * 600 * @param conf job configuration. 601 * @return the job Id. 602 * @throws OozieClientException thrown if the job could not be submitted. 603 */ 604 public String submit(Properties conf) throws OozieClientException { 605 return (new JobSubmit(conf, false)).call(); 606 } 607 608 private class JobAction extends ClientCallable<Void> { 609 610 JobAction(String jobId, String action) { 611 super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, action)); 612 } 613 614 JobAction(String jobId, String action, String params) { 615 super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, action, 616 RestConstants.JOB_CHANGE_VALUE, params)); 617 } 618 619 @Override 620 protected Void call(HttpURLConnection conn) throws IOException, OozieClientException { 621 if (!(conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 622 handleError(conn); 623 } 624 return null; 625 } 626 } 627 628 /** 629 * dryrun for a given job 630 * 631 * @param conf Job configuration. 632 */ 633 public String dryrun(Properties conf) throws OozieClientException { 634 return new JobSubmit(conf, RestConstants.JOB_ACTION_DRYRUN).call(); 635 } 636 637 /** 638 * Start a workflow job. 639 * 640 * @param jobId job Id. 641 * @throws OozieClientException thrown if the job could not be started. 642 */ 643 public void start(String jobId) throws OozieClientException { 644 new JobAction(jobId, RestConstants.JOB_ACTION_START).call(); 645 } 646 647 /** 648 * Submit and start a workflow job. 649 * 650 * @param conf job configuration. 651 * @return the job Id. 652 * @throws OozieClientException thrown if the job could not be submitted. 653 */ 654 public String run(Properties conf) throws OozieClientException { 655 return (new JobSubmit(conf, true)).call(); 656 } 657 658 /** 659 * Rerun a workflow job. 660 * 661 * @param jobId job Id to rerun. 662 * @param conf configuration information for the rerun. 663 * @throws OozieClientException thrown if the job could not be started. 664 */ 665 public void reRun(String jobId, Properties conf) throws OozieClientException { 666 new JobSubmit(jobId, conf).call(); 667 } 668 669 /** 670 * Suspend a workflow job. 671 * 672 * @param jobId job Id. 673 * @throws OozieClientException thrown if the job could not be suspended. 674 */ 675 public void suspend(String jobId) throws OozieClientException { 676 new JobAction(jobId, RestConstants.JOB_ACTION_SUSPEND).call(); 677 } 678 679 /** 680 * Resume a workflow job. 681 * 682 * @param jobId job Id. 683 * @throws OozieClientException thrown if the job could not be resume. 684 */ 685 public void resume(String jobId) throws OozieClientException { 686 new JobAction(jobId, RestConstants.JOB_ACTION_RESUME).call(); 687 } 688 689 /** 690 * Kill a workflow job. 691 * 692 * @param jobId job Id. 693 * @throws OozieClientException thrown if the job could not be killed. 694 */ 695 public void kill(String jobId) throws OozieClientException { 696 new JobAction(jobId, RestConstants.JOB_ACTION_KILL).call(); 697 } 698 699 /** 700 * Change a coordinator job. 701 * 702 * @param jobId job Id. 703 * @param changeValue change value. 704 * @throws OozieClientException thrown if the job could not be changed. 705 */ 706 public void change(String jobId, String changeValue) throws OozieClientException { 707 new JobAction(jobId, RestConstants.JOB_ACTION_CHANGE, changeValue).call(); 708 } 709 710 private class JobInfo extends ClientCallable<WorkflowJob> { 711 712 JobInfo(String jobId, int start, int len) { 713 super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, 714 RestConstants.JOB_SHOW_INFO, RestConstants.OFFSET_PARAM, Integer.toString(start), 715 RestConstants.LEN_PARAM, Integer.toString(len))); 716 } 717 718 @Override 719 protected WorkflowJob call(HttpURLConnection conn) throws IOException, OozieClientException { 720 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 721 Reader reader = new InputStreamReader(conn.getInputStream()); 722 JSONObject json = (JSONObject) JSONValue.parse(reader); 723 return JsonToBean.createWorkflowJob(json); 724 } 725 else { 726 handleError(conn); 727 } 728 return null; 729 } 730 } 731 732 733 private class JMSInfo extends ClientCallable<JMSConnectionInfo> { 734 735 JMSInfo() { 736 super("GET", RestConstants.ADMIN, RestConstants.ADMIN_JMS_INFO, prepareParams()); 737 } 738 739 protected JMSConnectionInfo call(HttpURLConnection conn) throws IOException, OozieClientException { 740 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 741 Reader reader = new InputStreamReader(conn.getInputStream()); 742 JSONObject json = (JSONObject) JSONValue.parse(reader); 743 return JsonToBean.createJMSConnectionInfo(json); 744 } 745 else { 746 handleError(conn); 747 } 748 return null; 749 } 750 } 751 752 private class WorkflowActionInfo extends ClientCallable<WorkflowAction> { 753 WorkflowActionInfo(String actionId) { 754 super("GET", RestConstants.JOB, notEmpty(actionId, "id"), prepareParams(RestConstants.JOB_SHOW_PARAM, 755 RestConstants.JOB_SHOW_INFO)); 756 } 757 758 @Override 759 protected WorkflowAction call(HttpURLConnection conn) throws IOException, OozieClientException { 760 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 761 Reader reader = new InputStreamReader(conn.getInputStream()); 762 JSONObject json = (JSONObject) JSONValue.parse(reader); 763 return JsonToBean.createWorkflowAction(json); 764 } 765 else { 766 handleError(conn); 767 } 768 return null; 769 } 770 } 771 772 /** 773 * Get the info of a workflow job. 774 * 775 * @param jobId job Id. 776 * @return the job info. 777 * @throws OozieClientException thrown if the job info could not be retrieved. 778 */ 779 public WorkflowJob getJobInfo(String jobId) throws OozieClientException { 780 return getJobInfo(jobId, 0, 0); 781 } 782 783 /** 784 * Get the JMS Connection info 785 * @return JMSConnectionInfo object 786 * @throws OozieClientException 787 */ 788 public JMSConnectionInfo getJMSConnectionInfo() throws OozieClientException { 789 return new JMSInfo().call(); 790 } 791 792 /** 793 * Get the info of a workflow job and subset actions. 794 * 795 * @param jobId job Id. 796 * @param start starting index in the list of actions belonging to the job 797 * @param len number of actions to be returned 798 * @return the job info. 799 * @throws OozieClientException thrown if the job info could not be retrieved. 800 */ 801 public WorkflowJob getJobInfo(String jobId, int start, int len) throws OozieClientException { 802 return new JobInfo(jobId, start, len).call(); 803 } 804 805 /** 806 * Get the info of a workflow action. 807 * 808 * @param actionId Id. 809 * @return the workflow action info. 810 * @throws OozieClientException thrown if the job info could not be retrieved. 811 */ 812 public WorkflowAction getWorkflowActionInfo(String actionId) throws OozieClientException { 813 return new WorkflowActionInfo(actionId).call(); 814 } 815 816 /** 817 * Get the log of a workflow job. 818 * 819 * @param jobId job Id. 820 * @return the job log. 821 * @throws OozieClientException thrown if the job info could not be retrieved. 822 */ 823 public String getJobLog(String jobId) throws OozieClientException { 824 return new JobLog(jobId).call(); 825 } 826 827 /** 828 * Get the log of a job. 829 * 830 * @param jobId job Id. 831 * @param logRetrievalType Based on which filter criteria the log is retrieved 832 * @param logRetrievalScope Value for the retrieval type 833 * @param ps Printstream of command line interface 834 * @throws OozieClientException thrown if the job info could not be retrieved. 835 */ 836 public void getJobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps) 837 throws OozieClientException { 838 new JobLog(jobId, logRetrievalType, logRetrievalScope, ps).call(); 839 } 840 841 private class JobLog extends JobMetadata { 842 JobLog(String jobId) { 843 super(jobId, RestConstants.JOB_SHOW_LOG); 844 } 845 846 JobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps) { 847 super(jobId, logRetrievalType, logRetrievalScope, RestConstants.JOB_SHOW_LOG, ps); 848 } 849 } 850 851 /** 852 * Gets the JMS topic name for a particular job 853 * @param jobId given jobId 854 * @return the JMS topic name 855 * @throws OozieClientException 856 */ 857 public String getJMSTopicName(String jobId) throws OozieClientException { 858 return new JMSTopic(jobId).call(); 859 } 860 861 private class JMSTopic extends ClientCallable<String> { 862 863 JMSTopic(String jobId) { 864 super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, 865 RestConstants.JOB_SHOW_JMS_TOPIC)); 866 } 867 868 protected String call(HttpURLConnection conn) throws IOException, OozieClientException { 869 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 870 Reader reader = new InputStreamReader(conn.getInputStream()); 871 JSONObject json = (JSONObject) JSONValue.parse(reader); 872 return (String) json.get(JsonTags.JMS_TOPIC_NAME); 873 } 874 else { 875 handleError(conn); 876 } 877 return null; 878 } 879 } 880 881 /** 882 * Get the definition of a workflow job. 883 * 884 * @param jobId job Id. 885 * @return the job log. 886 * @throws OozieClientException thrown if the job info could not be retrieved. 887 */ 888 public String getJobDefinition(String jobId) throws OozieClientException { 889 return new JobDefinition(jobId).call(); 890 } 891 892 private class JobDefinition extends JobMetadata { 893 894 JobDefinition(String jobId) { 895 super(jobId, RestConstants.JOB_SHOW_DEFINITION); 896 } 897 } 898 899 private class JobMetadata extends ClientCallable<String> { 900 PrintStream printStream; 901 902 JobMetadata(String jobId, String metaType) { 903 super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, 904 metaType)); 905 } 906 907 JobMetadata(String jobId, String logRetrievalType, String logRetrievalScope, String metaType, PrintStream ps) { 908 super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, 909 metaType, RestConstants.JOB_LOG_TYPE_PARAM, logRetrievalType, RestConstants.JOB_LOG_SCOPE_PARAM, 910 logRetrievalScope)); 911 printStream = ps; 912 } 913 914 @Override 915 protected String call(HttpURLConnection conn) throws IOException, OozieClientException { 916 String returnVal = null; 917 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 918 InputStream is = conn.getInputStream(); 919 InputStreamReader isr = new InputStreamReader(is); 920 try { 921 if (printStream != null) { 922 sendToOutputStream(isr, -1); 923 } 924 else { 925 returnVal = getReaderAsString(isr, -1); 926 } 927 } 928 finally { 929 isr.close(); 930 } 931 } 932 else { 933 handleError(conn); 934 } 935 return returnVal; 936 } 937 938 /** 939 * Output the log to command line interface 940 * 941 * @param reader reader to read into a string. 942 * @param maxLen max content length allowed, if -1 there is no limit. 943 * @param ps Printstream of command line interface 944 * @throws IOException 945 */ 946 private void sendToOutputStream(Reader reader, int maxLen) throws IOException { 947 if (reader == null) { 948 throw new IllegalArgumentException("reader cannot be null"); 949 } 950 StringBuilder sb = new StringBuilder(); 951 char[] buffer = new char[2048]; 952 int read; 953 int count = 0; 954 int noOfCharstoFlush = 1024; 955 while ((read = reader.read(buffer)) > -1) { 956 count += read; 957 if ((maxLen > -1) && (count > maxLen)) { 958 break; 959 } 960 sb.append(buffer, 0, read); 961 if (sb.length() > noOfCharstoFlush) { 962 printStream.print(sb.toString()); 963 sb = new StringBuilder(""); 964 } 965 } 966 printStream.print(sb.toString()); 967 } 968 969 /** 970 * Return a reader as string. 971 * <p/> 972 * 973 * @param reader reader to read into a string. 974 * @param maxLen max content length allowed, if -1 there is no limit. 975 * @return the reader content. 976 * @throws IOException thrown if the resource could not be read. 977 */ 978 private String getReaderAsString(Reader reader, int maxLen) throws IOException { 979 if (reader == null) { 980 throw new IllegalArgumentException("reader cannot be null"); 981 } 982 StringBuffer sb = new StringBuffer(); 983 char[] buffer = new char[2048]; 984 int read; 985 int count = 0; 986 while ((read = reader.read(buffer)) > -1) { 987 count += read; 988 989 // read up to maxLen chars; 990 if ((maxLen > -1) && (count > maxLen)) { 991 break; 992 } 993 sb.append(buffer, 0, read); 994 } 995 reader.close(); 996 return sb.toString(); 997 } 998 } 999 1000 private class CoordJobInfo extends ClientCallable<CoordinatorJob> { 1001 1002 CoordJobInfo(String jobId, String filter, int start, int len) { 1003 super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, 1004 RestConstants.JOB_SHOW_INFO, RestConstants.JOB_FILTER_PARAM, filter, RestConstants.OFFSET_PARAM, Integer.toString(start), 1005 RestConstants.LEN_PARAM, Integer.toString(len))); 1006 } 1007 1008 @Override 1009 protected CoordinatorJob call(HttpURLConnection conn) throws IOException, OozieClientException { 1010 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1011 Reader reader = new InputStreamReader(conn.getInputStream()); 1012 JSONObject json = (JSONObject) JSONValue.parse(reader); 1013 return JsonToBean.createCoordinatorJob(json); 1014 } 1015 else { 1016 handleError(conn); 1017 } 1018 return null; 1019 } 1020 } 1021 1022 private class BundleJobInfo extends ClientCallable<BundleJob> { 1023 1024 BundleJobInfo(String jobId) { 1025 super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, 1026 RestConstants.JOB_SHOW_INFO)); 1027 } 1028 1029 @Override 1030 protected BundleJob call(HttpURLConnection conn) throws IOException, OozieClientException { 1031 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1032 Reader reader = new InputStreamReader(conn.getInputStream()); 1033 JSONObject json = (JSONObject) JSONValue.parse(reader); 1034 return JsonToBean.createBundleJob(json); 1035 } 1036 else { 1037 handleError(conn); 1038 } 1039 return null; 1040 } 1041 } 1042 1043 private class CoordActionInfo extends ClientCallable<CoordinatorAction> { 1044 CoordActionInfo(String actionId) { 1045 super("GET", RestConstants.JOB, notEmpty(actionId, "id"), prepareParams(RestConstants.JOB_SHOW_PARAM, 1046 RestConstants.JOB_SHOW_INFO)); 1047 } 1048 1049 @Override 1050 protected CoordinatorAction call(HttpURLConnection conn) throws IOException, OozieClientException { 1051 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1052 Reader reader = new InputStreamReader(conn.getInputStream()); 1053 JSONObject json = (JSONObject) JSONValue.parse(reader); 1054 return JsonToBean.createCoordinatorAction(json); 1055 } 1056 else { 1057 handleError(conn); 1058 } 1059 return null; 1060 } 1061 } 1062 1063 /** 1064 * Get the info of a bundle job. 1065 * 1066 * @param jobId job Id. 1067 * @return the job info. 1068 * @throws OozieClientException thrown if the job info could not be retrieved. 1069 */ 1070 public BundleJob getBundleJobInfo(String jobId) throws OozieClientException { 1071 return new BundleJobInfo(jobId).call(); 1072 } 1073 1074 /** 1075 * Get the info of a coordinator job. 1076 * 1077 * @param jobId job Id. 1078 * @return the job info. 1079 * @throws OozieClientException thrown if the job info could not be retrieved. 1080 */ 1081 public CoordinatorJob getCoordJobInfo(String jobId) throws OozieClientException { 1082 return new CoordJobInfo(jobId, null, -1, -1).call(); 1083 } 1084 1085 /** 1086 * Get the info of a coordinator job and subset actions. 1087 * 1088 * @param jobId job Id. 1089 * @param filter filter the status filter 1090 * @param start starting index in the list of actions belonging to the job 1091 * @param len number of actions to be returned 1092 * @return the job info. 1093 * @throws OozieClientException thrown if the job info could not be retrieved. 1094 */ 1095 public CoordinatorJob getCoordJobInfo(String jobId, String filter, int start, int len) throws OozieClientException { 1096 return new CoordJobInfo(jobId, filter, start, len).call(); 1097 } 1098 1099 /** 1100 * Get the info of a coordinator action. 1101 * 1102 * @param actionId Id. 1103 * @return the coordinator action info. 1104 * @throws OozieClientException thrown if the job info could not be retrieved. 1105 */ 1106 public CoordinatorAction getCoordActionInfo(String actionId) throws OozieClientException { 1107 return new CoordActionInfo(actionId).call(); 1108 } 1109 1110 private class JobsStatus extends ClientCallable<List<WorkflowJob>> { 1111 1112 JobsStatus(String filter, int start, int len) { 1113 super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter, 1114 RestConstants.JOBTYPE_PARAM, "wf", RestConstants.OFFSET_PARAM, Integer.toString(start), 1115 RestConstants.LEN_PARAM, Integer.toString(len))); 1116 } 1117 1118 @Override 1119 @SuppressWarnings("unchecked") 1120 protected List<WorkflowJob> call(HttpURLConnection conn) throws IOException, OozieClientException { 1121 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 1122 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1123 Reader reader = new InputStreamReader(conn.getInputStream()); 1124 JSONObject json = (JSONObject) JSONValue.parse(reader); 1125 JSONArray workflows = (JSONArray) json.get(JsonTags.WORKFLOWS_JOBS); 1126 if (workflows == null) { 1127 workflows = new JSONArray(); 1128 } 1129 return JsonToBean.createWorkflowJobList(workflows); 1130 } 1131 else { 1132 handleError(conn); 1133 } 1134 return null; 1135 } 1136 } 1137 1138 private class CoordJobsStatus extends ClientCallable<List<CoordinatorJob>> { 1139 1140 CoordJobsStatus(String filter, int start, int len) { 1141 super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter, 1142 RestConstants.JOBTYPE_PARAM, "coord", RestConstants.OFFSET_PARAM, Integer.toString(start), 1143 RestConstants.LEN_PARAM, Integer.toString(len))); 1144 } 1145 1146 @Override 1147 protected List<CoordinatorJob> call(HttpURLConnection conn) throws IOException, OozieClientException { 1148 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 1149 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1150 Reader reader = new InputStreamReader(conn.getInputStream()); 1151 JSONObject json = (JSONObject) JSONValue.parse(reader); 1152 JSONArray jobs = (JSONArray) json.get(JsonTags.COORDINATOR_JOBS); 1153 if (jobs == null) { 1154 jobs = new JSONArray(); 1155 } 1156 return JsonToBean.createCoordinatorJobList(jobs); 1157 } 1158 else { 1159 handleError(conn); 1160 } 1161 return null; 1162 } 1163 } 1164 1165 private class BundleJobsStatus extends ClientCallable<List<BundleJob>> { 1166 1167 BundleJobsStatus(String filter, int start, int len) { 1168 super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter, 1169 RestConstants.JOBTYPE_PARAM, "bundle", RestConstants.OFFSET_PARAM, Integer.toString(start), 1170 RestConstants.LEN_PARAM, Integer.toString(len))); 1171 } 1172 1173 @Override 1174 protected List<BundleJob> call(HttpURLConnection conn) throws IOException, OozieClientException { 1175 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 1176 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1177 Reader reader = new InputStreamReader(conn.getInputStream()); 1178 JSONObject json = (JSONObject) JSONValue.parse(reader); 1179 JSONArray jobs = (JSONArray) json.get(JsonTags.BUNDLE_JOBS); 1180 if (jobs == null) { 1181 jobs = new JSONArray(); 1182 } 1183 return JsonToBean.createBundleJobList(jobs); 1184 } 1185 else { 1186 handleError(conn); 1187 } 1188 return null; 1189 } 1190 } 1191 1192 private class BulkResponseStatus extends ClientCallable<List<BulkResponse>> { 1193 1194 BulkResponseStatus(String filter, int start, int len) { 1195 super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_BULK_PARAM, filter, 1196 RestConstants.OFFSET_PARAM, Integer.toString(start), RestConstants.LEN_PARAM, Integer.toString(len))); 1197 } 1198 1199 @Override 1200 protected List<BulkResponse> call(HttpURLConnection conn) throws IOException, OozieClientException { 1201 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 1202 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1203 Reader reader = new InputStreamReader(conn.getInputStream()); 1204 JSONObject json = (JSONObject) JSONValue.parse(reader); 1205 JSONArray results = (JSONArray) json.get(JsonTags.BULK_RESPONSES); 1206 if (results == null) { 1207 results = new JSONArray(); 1208 } 1209 return JsonToBean.createBulkResponseList(results); 1210 } 1211 else { 1212 handleError(conn); 1213 } 1214 return null; 1215 } 1216 } 1217 1218 private class CoordRerun extends ClientCallable<List<CoordinatorAction>> { 1219 1220 CoordRerun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) { 1221 super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, 1222 RestConstants.JOB_COORD_ACTION_RERUN, RestConstants.JOB_COORD_RERUN_TYPE_PARAM, rerunType, 1223 RestConstants.JOB_COORD_RERUN_SCOPE_PARAM, scope, RestConstants.JOB_COORD_RERUN_REFRESH_PARAM, 1224 Boolean.toString(refresh), RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM, Boolean 1225 .toString(noCleanup))); 1226 } 1227 1228 @Override 1229 protected List<CoordinatorAction> call(HttpURLConnection conn) throws IOException, OozieClientException { 1230 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 1231 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1232 Reader reader = new InputStreamReader(conn.getInputStream()); 1233 JSONObject json = (JSONObject) JSONValue.parse(reader); 1234 JSONArray coordActions = (JSONArray) json.get(JsonTags.COORDINATOR_ACTIONS); 1235 return JsonToBean.createCoordinatorActionList(coordActions); 1236 } 1237 else { 1238 handleError(conn); 1239 } 1240 return null; 1241 } 1242 } 1243 1244 private class BundleRerun extends ClientCallable<Void> { 1245 1246 BundleRerun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) { 1247 super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, 1248 RestConstants.JOB_BUNDLE_ACTION_RERUN, RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM, 1249 coordScope, RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM, dateScope, 1250 RestConstants.JOB_COORD_RERUN_REFRESH_PARAM, Boolean.toString(refresh), 1251 RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM, Boolean.toString(noCleanup))); 1252 } 1253 1254 @Override 1255 protected Void call(HttpURLConnection conn) throws IOException, OozieClientException { 1256 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 1257 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1258 return null; 1259 } 1260 else { 1261 handleError(conn); 1262 } 1263 return null; 1264 } 1265 } 1266 1267 /** 1268 * Rerun coordinator actions. 1269 * 1270 * @param jobId coordinator jobId 1271 * @param rerunType rerun type 'date' if -date is used, 'action-id' if -action is used 1272 * @param scope rerun scope for date or actionIds 1273 * @param refresh true if -refresh is given in command option 1274 * @param noCleanup true if -nocleanup is given in command option 1275 * @throws OozieClientException 1276 */ 1277 public List<CoordinatorAction> reRunCoord(String jobId, String rerunType, String scope, boolean refresh, 1278 boolean noCleanup) throws OozieClientException { 1279 return new CoordRerun(jobId, rerunType, scope, refresh, noCleanup).call(); 1280 } 1281 1282 /** 1283 * Rerun bundle coordinators. 1284 * 1285 * @param jobId bundle jobId 1286 * @param coordScope rerun scope for coordinator jobs 1287 * @param dateScope rerun scope for date 1288 * @param refresh true if -refresh is given in command option 1289 * @param noCleanup true if -nocleanup is given in command option 1290 * @throws OozieClientException 1291 */ 1292 public Void reRunBundle(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) 1293 throws OozieClientException { 1294 return new BundleRerun(jobId, coordScope, dateScope, refresh, noCleanup).call(); 1295 } 1296 1297 /** 1298 * Return the info of the workflow jobs that match the filter. 1299 * 1300 * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax. 1301 * @param start jobs offset, base 1. 1302 * @param len number of jobs to return. 1303 * @return a list with the workflow jobs info, without node details. 1304 * @throws OozieClientException thrown if the jobs info could not be retrieved. 1305 */ 1306 public List<WorkflowJob> getJobsInfo(String filter, int start, int len) throws OozieClientException { 1307 return new JobsStatus(filter, start, len).call(); 1308 } 1309 1310 /** 1311 * Return the info of the workflow jobs that match the filter. 1312 * <p/> 1313 * It returns the first 100 jobs that match the filter. 1314 * 1315 * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax. 1316 * @return a list with the workflow jobs info, without node details. 1317 * @throws OozieClientException thrown if the jobs info could not be retrieved. 1318 */ 1319 public List<WorkflowJob> getJobsInfo(String filter) throws OozieClientException { 1320 return getJobsInfo(filter, 1, 50); 1321 } 1322 1323 /** 1324 * Print sla info about coordinator and workflow jobs and actions. 1325 * 1326 * @param start starting offset 1327 * @param len number of results 1328 * @throws OozieClientException 1329 */ 1330 public void getSlaInfo(int start, int len, String filter) throws OozieClientException { 1331 new SlaInfo(start, len, filter).call(); 1332 } 1333 1334 private class SlaInfo extends ClientCallable<Void> { 1335 1336 SlaInfo(int start, int len, String filter) { 1337 super("GET", WS_PROTOCOL_VERSION_1, RestConstants.SLA, "", prepareParams(RestConstants.SLA_GT_SEQUENCE_ID, 1338 Integer.toString(start), RestConstants.MAX_EVENTS, Integer.toString(len), 1339 RestConstants.JOBS_FILTER_PARAM, filter)); 1340 } 1341 1342 @Override 1343 @SuppressWarnings("unchecked") 1344 protected Void call(HttpURLConnection conn) throws IOException, OozieClientException { 1345 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 1346 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1347 BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream())); 1348 String line = null; 1349 while ((line = br.readLine()) != null) { 1350 System.out.println(line); 1351 } 1352 } 1353 else { 1354 handleError(conn); 1355 } 1356 return null; 1357 } 1358 } 1359 1360 private class JobIdAction extends ClientCallable<String> { 1361 1362 JobIdAction(String externalId) { 1363 super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, "wf", 1364 RestConstants.JOBS_EXTERNAL_ID_PARAM, externalId)); 1365 } 1366 1367 @Override 1368 @SuppressWarnings("unchecked") 1369 protected String call(HttpURLConnection conn) throws IOException, OozieClientException { 1370 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1371 Reader reader = new InputStreamReader(conn.getInputStream()); 1372 JSONObject json = (JSONObject) JSONValue.parse(reader); 1373 return (String) json.get(JsonTags.JOB_ID); 1374 } 1375 else { 1376 handleError(conn); 1377 } 1378 return null; 1379 } 1380 } 1381 1382 /** 1383 * Return the workflow job Id for an external Id. 1384 * <p/> 1385 * The external Id must have provided at job creation time. 1386 * 1387 * @param externalId external Id given at job creation time. 1388 * @return the workflow job Id for an external Id, <code>null</code> if none. 1389 * @throws OozieClientException thrown if the operation could not be done. 1390 */ 1391 public String getJobId(String externalId) throws OozieClientException { 1392 return new JobIdAction(externalId).call(); 1393 } 1394 1395 private class SetSystemMode extends ClientCallable<Void> { 1396 1397 public SetSystemMode(SYSTEM_MODE status) { 1398 super("PUT", RestConstants.ADMIN, RestConstants.ADMIN_STATUS_RESOURCE, prepareParams( 1399 RestConstants.ADMIN_SYSTEM_MODE_PARAM, status + "")); 1400 } 1401 1402 @Override 1403 public Void call(HttpURLConnection conn) throws IOException, OozieClientException { 1404 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { 1405 handleError(conn); 1406 } 1407 return null; 1408 } 1409 } 1410 1411 /** 1412 * Enable or disable safe mode. Used by OozieCLI. In safe mode, Oozie would not accept any commands except status 1413 * command to change and view the safe mode status. 1414 * 1415 * @param status true to enable safe mode, false to disable safe mode. 1416 * @throws OozieClientException if it fails to set the safe mode status. 1417 */ 1418 public void setSystemMode(SYSTEM_MODE status) throws OozieClientException { 1419 new SetSystemMode(status).call(); 1420 } 1421 1422 private class GetSystemMode extends ClientCallable<SYSTEM_MODE> { 1423 1424 GetSystemMode() { 1425 super("GET", RestConstants.ADMIN, RestConstants.ADMIN_STATUS_RESOURCE, prepareParams()); 1426 } 1427 1428 @Override 1429 protected SYSTEM_MODE call(HttpURLConnection conn) throws IOException, OozieClientException { 1430 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1431 Reader reader = new InputStreamReader(conn.getInputStream()); 1432 JSONObject json = (JSONObject) JSONValue.parse(reader); 1433 return SYSTEM_MODE.valueOf((String) json.get(JsonTags.OOZIE_SYSTEM_MODE)); 1434 } 1435 else { 1436 handleError(conn); 1437 } 1438 return SYSTEM_MODE.NORMAL; 1439 } 1440 } 1441 1442 /** 1443 * Returns if Oozie is in safe mode or not. 1444 * 1445 * @return true if safe mode is ON<br> 1446 * false if safe mode is OFF 1447 * @throws OozieClientException throw if it could not obtain the safe mode status. 1448 */ 1449 /* 1450 * public boolean isInSafeMode() throws OozieClientException { return new GetSafeMode().call(); } 1451 */ 1452 public SYSTEM_MODE getSystemMode() throws OozieClientException { 1453 return new GetSystemMode().call(); 1454 } 1455 1456 private class GetBuildVersion extends ClientCallable<String> { 1457 1458 GetBuildVersion() { 1459 super("GET", RestConstants.ADMIN, RestConstants.ADMIN_BUILD_VERSION_RESOURCE, prepareParams()); 1460 } 1461 1462 @Override 1463 protected String call(HttpURLConnection conn) throws IOException, OozieClientException { 1464 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1465 Reader reader = new InputStreamReader(conn.getInputStream()); 1466 JSONObject json = (JSONObject) JSONValue.parse(reader); 1467 return (String) json.get(JsonTags.BUILD_VERSION); 1468 } 1469 else { 1470 handleError(conn); 1471 } 1472 return null; 1473 } 1474 } 1475 1476 /** 1477 * Return the Oozie server build version. 1478 * 1479 * @return the Oozie server build version. 1480 * @throws OozieClientException throw if it the server build version could not be retrieved. 1481 */ 1482 public String getServerBuildVersion() throws OozieClientException { 1483 return new GetBuildVersion().call(); 1484 } 1485 1486 /** 1487 * Return the Oozie client build version. 1488 * 1489 * @return the Oozie client build version. 1490 */ 1491 public String getClientBuildVersion() { 1492 return BuildInfo.getBuildInfo().getProperty(BuildInfo.BUILD_VERSION); 1493 } 1494 1495 /** 1496 * Return the info of the coordinator jobs that match the filter. 1497 * 1498 * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax. 1499 * @param start jobs offset, base 1. 1500 * @param len number of jobs to return. 1501 * @return a list with the coordinator jobs info 1502 * @throws OozieClientException thrown if the jobs info could not be retrieved. 1503 */ 1504 public List<CoordinatorJob> getCoordJobsInfo(String filter, int start, int len) throws OozieClientException { 1505 return new CoordJobsStatus(filter, start, len).call(); 1506 } 1507 1508 /** 1509 * Return the info of the bundle jobs that match the filter. 1510 * 1511 * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax. 1512 * @param start jobs offset, base 1. 1513 * @param len number of jobs to return. 1514 * @return a list with the bundle jobs info 1515 * @throws OozieClientException thrown if the jobs info could not be retrieved. 1516 */ 1517 public List<BundleJob> getBundleJobsInfo(String filter, int start, int len) throws OozieClientException { 1518 return new BundleJobsStatus(filter, start, len).call(); 1519 } 1520 1521 public List<BulkResponse> getBulkInfo(String filter, int start, int len) throws OozieClientException { 1522 return new BulkResponseStatus(filter, start, len).call(); 1523 } 1524 1525 private class GetQueueDump extends ClientCallable<List<String>> { 1526 GetQueueDump() { 1527 super("GET", RestConstants.ADMIN, RestConstants.ADMIN_QUEUE_DUMP_RESOURCE, prepareParams()); 1528 } 1529 1530 @Override 1531 protected List<String> call(HttpURLConnection conn) throws IOException, OozieClientException { 1532 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1533 Reader reader = new InputStreamReader(conn.getInputStream()); 1534 JSONObject json = (JSONObject) JSONValue.parse(reader); 1535 JSONArray queueDumpArray = (JSONArray) json.get(JsonTags.QUEUE_DUMP); 1536 1537 List<String> list = new ArrayList<String>(); 1538 list.add("[Server Queue Dump]:"); 1539 for (Object o : queueDumpArray) { 1540 JSONObject entry = (JSONObject) o; 1541 if (entry.get(JsonTags.CALLABLE_DUMP) != null) { 1542 String value = (String) entry.get(JsonTags.CALLABLE_DUMP); 1543 list.add(value); 1544 } 1545 } 1546 if (queueDumpArray.size() == 0) { 1547 list.add("Queue dump is null!"); 1548 } 1549 1550 list.add("******************************************"); 1551 list.add("[Server Uniqueness Map Dump]:"); 1552 1553 JSONArray uniqueDumpArray = (JSONArray) json.get(JsonTags.UNIQUE_MAP_DUMP); 1554 for (Object o : uniqueDumpArray) { 1555 JSONObject entry = (JSONObject) o; 1556 if (entry.get(JsonTags.UNIQUE_ENTRY_DUMP) != null) { 1557 String value = (String) entry.get(JsonTags.UNIQUE_ENTRY_DUMP); 1558 list.add(value); 1559 } 1560 } 1561 if (uniqueDumpArray.size() == 0) { 1562 list.add("Uniqueness dump is null!"); 1563 } 1564 return list; 1565 } 1566 else { 1567 handleError(conn); 1568 } 1569 return null; 1570 } 1571 } 1572 1573 /** 1574 * Return the Oozie queue's commands' dump 1575 * 1576 * @return the list of strings of callable identification in queue 1577 * @throws OozieClientException throw if it the queue dump could not be retrieved. 1578 */ 1579 public List<String> getQueueDump() throws OozieClientException { 1580 return new GetQueueDump().call(); 1581 } 1582 1583 /** 1584 * Check if the string is not null or not empty. 1585 * 1586 * @param str 1587 * @param name 1588 * @return string 1589 */ 1590 public static String notEmpty(String str, String name) { 1591 if (str == null) { 1592 throw new IllegalArgumentException(name + " cannot be null"); 1593 } 1594 if (str.length() == 0) { 1595 throw new IllegalArgumentException(name + " cannot be empty"); 1596 } 1597 return str; 1598 } 1599 1600 /** 1601 * Check if the object is not null. 1602 * 1603 * @param <T> 1604 * @param obj 1605 * @param name 1606 * @return string 1607 */ 1608 public static <T> T notNull(T obj, String name) { 1609 if (obj == null) { 1610 throw new IllegalArgumentException(name + " cannot be null"); 1611 } 1612 return obj; 1613 } 1614 1615 }