001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.client; 020 021import org.apache.oozie.BuildInfo; 022import org.apache.oozie.client.rest.JsonTags; 023import org.apache.oozie.client.rest.JsonToBean; 024import org.apache.oozie.client.rest.RestConstants; 025import org.apache.oozie.client.retry.ConnectionRetriableClient; 026import org.json.simple.JSONArray; 027import org.json.simple.JSONObject; 028import org.json.simple.JSONValue; 029import org.w3c.dom.Document; 030import org.w3c.dom.Element; 031 032import javax.xml.parsers.DocumentBuilderFactory; 033import javax.xml.transform.Transformer; 034import javax.xml.transform.TransformerFactory; 035import javax.xml.transform.dom.DOMSource; 036import javax.xml.transform.stream.StreamResult; 037import java.io.BufferedReader; 038import java.io.File; 039import java.io.FileInputStream; 040import java.io.IOException; 041import java.io.InputStream; 042import java.io.InputStreamReader; 043import java.io.OutputStream; 044import java.io.PrintStream; 045import java.io.Reader; 046import java.net.HttpURLConnection; 047import java.net.URL; 048import java.net.URLEncoder; 049import java.util.ArrayList; 050import java.util.Collections; 051import java.util.HashMap; 052import java.util.HashSet; 053import java.util.Iterator; 054import java.util.LinkedHashMap; 055import java.util.List; 056import java.util.Map; 057import java.util.Map.Entry; 058import java.util.Properties; 059import java.util.Set; 060import java.util.concurrent.Callable; 061 062/** 063 * Client API to submit and manage Oozie workflow jobs against an Oozie intance. 064 * <p> 065 * This class is thread safe. 066 * <p> 067 * Syntax for filter for the {@link #getJobsInfo(String)} {@link #getJobsInfo(String, int, int)} methods: 068 * <code>[NAME=VALUE][;NAME=VALUE]*</code>. 069 * <p> 070 * Valid filter names are: 071 * <p> 072 * <ul> 073 * <li>name: the workflow application name from the workflow definition.</li> 074 * <li>user: the user that submitted the job.</li> 075 * <li>group: the group for the job.</li> 076 * <li>status: the status of the job.</li> 077 * </ul> 078 * <p> 079 * The query will do an AND among all the filter names. The query will do an OR among all the filter values for the same 080 * name. Multiple values must be specified as different name value pairs. 081 */ 082public class OozieClient { 083 084 public static final long WS_PROTOCOL_VERSION_0 = 0; 085 086 public static final long WS_PROTOCOL_VERSION_1 = 1; 087 088 public static final long WS_PROTOCOL_VERSION = 2; // pointer to current version 089 090 public static final String USER_NAME = "user.name"; 091 092 @Deprecated 093 public static final String GROUP_NAME = "group.name"; 094 095 public static final String JOB_ACL = "oozie.job.acl"; 096 097 public static final String APP_PATH = "oozie.wf.application.path"; 098 099 public static final String COORDINATOR_APP_PATH = "oozie.coord.application.path"; 100 101 public static final String BUNDLE_APP_PATH = "oozie.bundle.application.path"; 102 103 public static final String BUNDLE_ID = "oozie.bundle.id"; 104 105 public static final String EXTERNAL_ID = "oozie.wf.external.id"; 106 107 public static final String WORKFLOW_NOTIFICATION_PROXY = "oozie.wf.workflow.notification.proxy"; 108 109 public static final String WORKFLOW_NOTIFICATION_URL = "oozie.wf.workflow.notification.url"; 110 111 public static final String ACTION_NOTIFICATION_URL = "oozie.wf.action.notification.url"; 112 113 public static final String COORD_ACTION_NOTIFICATION_URL = "oozie.coord.action.notification.url"; 114 115 public static final String COORD_ACTION_NOTIFICATION_PROXY = "oozie.coord.action.notification.proxy"; 116 117 public static final String RERUN_SKIP_NODES = "oozie.wf.rerun.skip.nodes"; 118 119 public static final String RERUN_FAIL_NODES = "oozie.wf.rerun.failnodes"; 120 121 public static final String LOG_TOKEN = "oozie.wf.log.token"; 122 123 public static final String ACTION_MAX_RETRIES = "oozie.wf.action.max.retries"; 124 125 public static final String ACTION_RETRY_INTERVAL = "oozie.wf.action.retry.interval"; 126 127 public static final String FILTER_USER = "user"; 128 129 public static final String FILTER_GROUP = "group"; 130 131 public static final String FILTER_NAME = "name"; 132 133 public static final String FILTER_STATUS = "status"; 134 135 public static final String FILTER_NOMINAL_TIME = "nominaltime"; 136 137 public static final String FILTER_FREQUENCY = "frequency"; 138 139 public static final String FILTER_ID = "id"; 140 141 public static final String FILTER_UNIT = "unit"; 142 143 public static final String FILTER_JOBID = "jobid"; 144 145 public static final String FILTER_APPNAME = "appname"; 146 147 public static final String FILTER_SLA_APPNAME = "app_name"; 148 149 public static final String FILTER_SLA_ID = "id"; 150 151 public static final String FILTER_SLA_PARENT_ID = "parent_id"; 152 153 public static final String FILTER_BUNDLE = "bundle"; 154 155 public static final String FILTER_SLA_EVENT_STATUS = "event_status"; 156 157 public static final String FILTER_SLA_STATUS = "sla_status"; 158 159 public static final String FILTER_SLA_NOMINAL_START = "nominal_start"; 160 161 public static final String FILTER_SLA_NOMINAL_END = "nominal_end"; 162 163 public static final String FILTER_CREATED_TIME_START = "startcreatedtime"; 164 165 public static final String FILTER_CREATED_TIME_END = "endcreatedtime"; 166 167 public static final String SLA_DISABLE_ALERT = "oozie.sla.disable.alerts"; 168 169 public static final String SLA_ENABLE_ALERT = "oozie.sla.enable.alerts"; 170 171 public static final String SLA_DISABLE_ALERT_OLDER_THAN = SLA_DISABLE_ALERT + ".older.than"; 172 173 public static final String SLA_DISABLE_ALERT_COORD = SLA_DISABLE_ALERT + ".coord"; 174 175 public static final String CHANGE_VALUE_ENDTIME = "endtime"; 176 177 public static final String CHANGE_VALUE_PAUSETIME = "pausetime"; 178 179 public static final String CHANGE_VALUE_CONCURRENCY = "concurrency"; 180 181 public static final String CHANGE_VALUE_STATUS = "status"; 182 183 public static final String LIBPATH = "oozie.libpath"; 184 185 public static final String USE_SYSTEM_LIBPATH = "oozie.use.system.libpath"; 186 187 public static final String OOZIE_SUSPEND_ON_NODES = "oozie.suspend.on.nodes"; 188 189 public static final String FILTER_SORT_BY = "sortby"; 190 191 public enum SORT_BY { 192 createdTime("createdTimestamp"), lastModifiedTime("lastModifiedTimestamp"); 193 private final String fullname; 194 195 SORT_BY(String fullname) { 196 this.fullname = fullname; 197 } 198 199 public String getFullname() { 200 return fullname; 201 } 202 } 203 204 public static enum SYSTEM_MODE { 205 NORMAL, NOWEBSERVICE, SAFEMODE 206 } 207 208 private static final Set<String> COMPLETED_WF_STATUSES = new HashSet<String>(); 209 private static final Set<String> COMPLETED_COORD_AND_BUNDLE_STATUSES = new HashSet<String>(); 210 private static final Set<String> COMPLETED_COORD_ACTION_STATUSES = new HashSet<String>(); 211 static { 212 COMPLETED_WF_STATUSES.add(WorkflowJob.Status.FAILED.toString()); 213 COMPLETED_WF_STATUSES.add(WorkflowJob.Status.KILLED.toString()); 214 COMPLETED_WF_STATUSES.add(WorkflowJob.Status.SUCCEEDED.toString()); 215 COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.FAILED.toString()); 216 COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.KILLED.toString()); 217 COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.SUCCEEDED.toString()); 218 COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.DONEWITHERROR.toString()); 219 COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.IGNORED.toString()); 220 COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.FAILED.toString()); 221 COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.IGNORED.toString()); 222 COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.KILLED.toString()); 223 COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.SKIPPED.toString()); 224 COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.SUCCEEDED.toString()); 225 COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.TIMEDOUT.toString()); 226 } 227 228 /** 229 * debugMode =0 means no debugging. 1 means debugging on. 230 */ 231 public int debugMode = 0; 232 233 private int retryCount = 4; 234 235 236 private String baseUrl; 237 private String protocolUrl; 238 private boolean validatedVersion = false; 239 private JSONArray supportedVersions; 240 private final Map<String, String> headers = new HashMap<String, String>(); 241 242 private static final ThreadLocal<String> USER_NAME_TL = new ThreadLocal<String>(); 243 244 /** 245 * Allows to impersonate other users in the Oozie server. The current user 246 * must be configured as a proxyuser in Oozie. 247 * <p> 248 * IMPORTANT: impersonation happens only with Oozie client requests done within 249 * doAs() calls. 250 * 251 * @param userName user to impersonate. 252 * @param callable callable with {@link OozieClient} calls impersonating the specified user. 253 * @return any response returned by the {@link Callable#call()} method. 254 * @throws Exception thrown by the {@link Callable#call()} method. 255 */ 256 public static <T> T doAs(String userName, Callable<T> callable) throws Exception { 257 notEmpty(userName, "userName"); 258 notNull(callable, "callable"); 259 try { 260 USER_NAME_TL.set(userName); 261 return callable.call(); 262 } 263 finally { 264 USER_NAME_TL.remove(); 265 } 266 } 267 268 protected OozieClient() { 269 } 270 271 /** 272 * Create a Workflow client instance. 273 * 274 * @param oozieUrl URL of the Oozie instance it will interact with. 275 */ 276 public OozieClient(String oozieUrl) { 277 this.baseUrl = notEmpty(oozieUrl, "oozieUrl"); 278 if (!this.baseUrl.endsWith("/")) { 279 this.baseUrl += "/"; 280 } 281 } 282 283 /** 284 * Return the Oozie URL of the workflow client instance. 285 * <p> 286 * This URL is the base URL fo the Oozie system, with not protocol versioning. 287 * 288 * @return the Oozie URL of the workflow client instance. 289 */ 290 public String getOozieUrl() { 291 return baseUrl; 292 } 293 294 /** 295 * Return the Oozie URL used by the client and server for WS communications. 296 * <p> 297 * This URL is the original URL plus the versioning element path. 298 * 299 * @return the Oozie URL used by the client and server for communication. 300 * @throws OozieClientException thrown in the client and the server are not protocol compatible. 301 */ 302 public String getProtocolUrl() throws OozieClientException { 303 validateWSVersion(); 304 return protocolUrl; 305 } 306 307 /** 308 * @return current debug Mode 309 */ 310 public int getDebugMode() { 311 return debugMode; 312 } 313 314 /** 315 * Set debug mode. 316 * 317 * @param debugMode : 0 means no debugging. 1 means debugging 318 */ 319 public void setDebugMode(int debugMode) { 320 this.debugMode = debugMode; 321 } 322 323 public int getRetryCount() { 324 return retryCount; 325 } 326 327 328 public void setRetryCount(int retryCount) { 329 this.retryCount = retryCount; 330 } 331 332 private String getBaseURLForVersion(long protocolVersion) throws OozieClientException { 333 try { 334 if (supportedVersions == null) { 335 supportedVersions = getSupportedProtocolVersions(); 336 } 337 if (supportedVersions == null) { 338 throw new OozieClientException("HTTP error", "no response message"); 339 } 340 if (supportedVersions.contains(protocolVersion)) { 341 return baseUrl + "v" + protocolVersion + "/"; 342 } 343 else { 344 throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, "Protocol version " 345 + protocolVersion + " is not supported"); 346 } 347 } 348 catch (IOException e) { 349 throw new OozieClientException(OozieClientException.IO_ERROR, e); 350 } 351 } 352 353 /** 354 * Validate that the Oozie client and server instances are protocol compatible. 355 * 356 * @throws OozieClientException thrown in the client and the server are not protocol compatible. 357 */ 358 public synchronized void validateWSVersion() throws OozieClientException { 359 if (!validatedVersion) { 360 try { 361 supportedVersions = getSupportedProtocolVersions(); 362 if (supportedVersions == null) { 363 throw new OozieClientException("HTTP error", "no response message"); 364 } 365 if (!supportedVersions.contains(WS_PROTOCOL_VERSION) 366 && !supportedVersions.contains(WS_PROTOCOL_VERSION_1) 367 && !supportedVersions.contains(WS_PROTOCOL_VERSION_0)) { 368 StringBuilder msg = new StringBuilder(); 369 msg.append("Supported version [").append(WS_PROTOCOL_VERSION) 370 .append("] or less, Unsupported versions["); 371 String separator = ""; 372 for (Object version : supportedVersions) { 373 msg.append(separator).append(version); 374 } 375 msg.append("]"); 376 throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, msg.toString()); 377 } 378 if (supportedVersions.contains(WS_PROTOCOL_VERSION)) { 379 protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION + "/"; 380 } 381 else if (supportedVersions.contains(WS_PROTOCOL_VERSION_1)) { 382 protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION_1 + "/"; 383 } 384 else { 385 if (supportedVersions.contains(WS_PROTOCOL_VERSION_0)) { 386 protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION_0 + "/"; 387 } 388 } 389 } 390 catch (IOException ex) { 391 throw new OozieClientException(OozieClientException.IO_ERROR, ex); 392 } 393 validatedVersion = true; 394 } 395 } 396 397 private JSONArray getSupportedProtocolVersions() throws IOException, OozieClientException { 398 JSONArray versions = null; 399 final URL url = new URL(baseUrl + RestConstants.VERSIONS); 400 401 HttpURLConnection conn = createRetryableConnection(url, "GET"); 402 403 if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) { 404 versions = (JSONArray) JSONValue.parse(new InputStreamReader(conn.getInputStream())); 405 } 406 else { 407 handleError(conn); 408 } 409 return versions; 410 } 411 412 /** 413 * Create an empty configuration with just the {@link #USER_NAME} set to the JVM user name. 414 * 415 * @return an empty configuration. 416 */ 417 public Properties createConfiguration() { 418 Properties conf = new Properties(); 419 String userName = USER_NAME_TL.get(); 420 if (userName == null) { 421 userName = System.getProperty("user.name"); 422 } 423 conf.setProperty(USER_NAME, userName); 424 return conf; 425 } 426 427 /** 428 * Set a HTTP header to be used in the WS requests by the workflow instance. 429 * 430 * @param name header name. 431 * @param value header value. 432 */ 433 public void setHeader(String name, String value) { 434 headers.put(notEmpty(name, "name"), notNull(value, "value")); 435 } 436 437 /** 438 * Get the value of a set HTTP header from the workflow instance. 439 * 440 * @param name header name. 441 * @return header value, <code>null</code> if not set. 442 */ 443 public String getHeader(String name) { 444 return headers.get(notEmpty(name, "name")); 445 } 446 447 /** 448 * Get the set HTTP header 449 * 450 * @return map of header key and value 451 */ 452 public Map<String, String> getHeaders() { 453 return headers; 454 } 455 456 /** 457 * Remove a HTTP header from the workflow client instance. 458 * 459 * @param name header name. 460 */ 461 public void removeHeader(String name) { 462 headers.remove(notEmpty(name, "name")); 463 } 464 465 /** 466 * Return an iterator with all the header names set in the workflow instance. 467 * 468 * @return header names. 469 */ 470 public Iterator<String> getHeaderNames() { 471 return Collections.unmodifiableMap(headers).keySet().iterator(); 472 } 473 474 private URL createURL(Long protocolVersion, String collection, String resource, Map<String, String> parameters) 475 throws IOException, OozieClientException { 476 validateWSVersion(); 477 StringBuilder sb = new StringBuilder(); 478 if (protocolVersion == null) { 479 sb.append(protocolUrl); 480 } 481 else { 482 sb.append(getBaseURLForVersion(protocolVersion)); 483 } 484 sb.append(collection); 485 if (resource != null && resource.length() > 0) { 486 sb.append("/").append(resource); 487 } 488 if (parameters.size() > 0) { 489 String separator = "?"; 490 for (Map.Entry<String, String> param : parameters.entrySet()) { 491 if (param.getValue() != null) { 492 sb.append(separator).append(URLEncoder.encode(param.getKey(), "UTF-8")).append("=").append( 493 URLEncoder.encode(param.getValue(), "UTF-8")); 494 separator = "&"; 495 } 496 } 497 } 498 return new URL(sb.toString()); 499 } 500 501 private boolean validateCommand(String url) { 502 { 503 if (protocolUrl.contains(baseUrl + "v0")) { 504 if (url.contains("dryrun") || url.contains("jobtype=c") || url.contains("systemmode")) { 505 return false; 506 } 507 } 508 } 509 return true; 510 } 511 /** 512 * Create retryable http connection to oozie server. 513 * 514 * @param url 515 * @param method 516 * @return connection 517 * @throws IOException 518 */ 519 protected HttpURLConnection createRetryableConnection(final URL url, final String method) throws IOException{ 520 return (HttpURLConnection) new ConnectionRetriableClient(getRetryCount()) { 521 @Override 522 public Object doExecute(URL url, String method) throws IOException, OozieClientException { 523 HttpURLConnection conn = createConnection(url, method); 524 return conn; 525 } 526 }.execute(url, method); 527 } 528 529 /** 530 * Create http connection to oozie server. 531 * 532 * @param url 533 * @param method 534 * @return connection 535 * @throws IOException 536 * @throws OozieClientException 537 */ 538 protected HttpURLConnection createConnection(URL url, String method) throws IOException, OozieClientException { 539 HttpURLConnection conn = (HttpURLConnection) url.openConnection(); 540 conn.setRequestMethod(method); 541 if (method.equals("POST") || method.equals("PUT")) { 542 conn.setDoOutput(true); 543 } 544 for (Map.Entry<String, String> header : headers.entrySet()) { 545 conn.setRequestProperty(header.getKey(), header.getValue()); 546 } 547 return conn; 548 } 549 550 protected abstract class ClientCallable<T> implements Callable<T> { 551 private final String method; 552 private final String collection; 553 private final String resource; 554 private final Map<String, String> params; 555 private final Long protocolVersion; 556 557 public ClientCallable(String method, String collection, String resource, Map<String, String> params) { 558 this(method, null, collection, resource, params); 559 } 560 561 public ClientCallable(String method, Long protocolVersion, String collection, String resource, Map<String, String> params) { 562 this.method = method; 563 this.protocolVersion = protocolVersion; 564 this.collection = collection; 565 this.resource = resource; 566 this.params = params; 567 } 568 569 public T call() throws OozieClientException { 570 try { 571 URL url = createURL(protocolVersion, collection, resource, params); 572 if (validateCommand(url.toString())) { 573 if (getDebugMode() > 0) { 574 System.out.println(method + " " + url); 575 } 576 return call(createRetryableConnection(url, method)); 577 } 578 else { 579 System.out.println("Option not supported in target server. Supported only on Oozie-2.0 or greater." 580 + " Use 'oozie help' for details"); 581 throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, new Exception()); 582 } 583 } 584 catch (IOException ex) { 585 throw new OozieClientException(OozieClientException.IO_ERROR, ex); 586 } 587 } 588 589 protected abstract T call(HttpURLConnection conn) throws IOException, OozieClientException; 590 } 591 592 protected abstract class MapClientCallable extends ClientCallable<Map<String, String>> { 593 594 MapClientCallable(String method, String collection, String resource, Map<String, String> params) { 595 super(method, collection, resource, params); 596 } 597 598 @Override 599 protected Map<String, String> call(HttpURLConnection conn) throws IOException, OozieClientException { 600 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 601 Reader reader = new InputStreamReader(conn.getInputStream()); 602 JSONObject json = (JSONObject) JSONValue.parse(reader); 603 Map<String, String> map = new HashMap<String, String>(); 604 for (Object key : json.keySet()) { 605 map.put((String)key, (String)json.get(key)); 606 } 607 return map; 608 } 609 else { 610 handleError(conn); 611 } 612 return null; 613 } 614 } 615 616 static void handleError(HttpURLConnection conn) throws IOException, OozieClientException { 617 int status = conn.getResponseCode(); 618 String error = conn.getHeaderField(RestConstants.OOZIE_ERROR_CODE); 619 String message = conn.getHeaderField(RestConstants.OOZIE_ERROR_MESSAGE); 620 621 if (error == null) { 622 error = "HTTP error code: " + status; 623 } 624 625 if (message == null) { 626 message = conn.getResponseMessage(); 627 } 628 throw new OozieClientException(error, message); 629 } 630 631 static Map<String, String> prepareParams(String... params) { 632 Map<String, String> map = new LinkedHashMap<String, String>(); 633 for (int i = 0; i < params.length; i = i + 2) { 634 map.put(params[i], params[i + 1]); 635 } 636 String doAsUserName = USER_NAME_TL.get(); 637 if (doAsUserName != null) { 638 map.put(RestConstants.DO_AS_PARAM, doAsUserName); 639 } 640 return map; 641 } 642 643 public void writeToXml(Properties props, OutputStream out) throws IOException { 644 try { 645 Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument(); 646 Element conf = doc.createElement("configuration"); 647 doc.appendChild(conf); 648 conf.appendChild(doc.createTextNode("\n")); 649 for (String name : props.stringPropertyNames()) { // Properties whose key or value is not of type String are omitted. 650 String value = props.getProperty(name); 651 Element propNode = doc.createElement("property"); 652 conf.appendChild(propNode); 653 654 Element nameNode = doc.createElement("name"); 655 nameNode.appendChild(doc.createTextNode(name.trim())); 656 propNode.appendChild(nameNode); 657 658 Element valueNode = doc.createElement("value"); 659 valueNode.appendChild(doc.createTextNode(value.trim())); 660 propNode.appendChild(valueNode); 661 662 conf.appendChild(doc.createTextNode("\n")); 663 } 664 665 DOMSource source = new DOMSource(doc); 666 StreamResult result = new StreamResult(out); 667 TransformerFactory transFactory = TransformerFactory.newInstance(); 668 transFactory.setFeature("http://javax.xml.XMLConstants/feature/secure-processing", true); 669 Transformer transformer = transFactory.newTransformer(); 670 transformer.transform(source, result); 671 if (getDebugMode() > 0) { 672 result = new StreamResult(System.out); 673 transformer.transform(source, result); 674 System.out.println(); 675 } 676 } 677 catch (Exception e) { 678 throw new IOException(e); 679 } 680 } 681 682 private class JobSubmit extends ClientCallable<String> { 683 private final Properties conf; 684 685 JobSubmit(Properties conf, boolean start) { 686 super("POST", RestConstants.JOBS, "", (start) ? prepareParams(RestConstants.ACTION_PARAM, 687 RestConstants.JOB_ACTION_START) : prepareParams()); 688 this.conf = notNull(conf, "conf"); 689 } 690 691 JobSubmit(String jobId, Properties conf) { 692 super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, 693 RestConstants.JOB_ACTION_RERUN)); 694 this.conf = notNull(conf, "conf"); 695 } 696 697 public JobSubmit(Properties conf, String jobActionDryrun) { 698 super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.ACTION_PARAM, 699 RestConstants.JOB_ACTION_DRYRUN)); 700 this.conf = notNull(conf, "conf"); 701 } 702 703 @Override 704 protected String call(HttpURLConnection conn) throws IOException, OozieClientException { 705 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 706 writeToXml(conf, conn.getOutputStream()); 707 if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) { 708 JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream())); 709 return (String) json.get(JsonTags.JOB_ID); 710 } 711 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { 712 handleError(conn); 713 } 714 return null; 715 } 716 } 717 718 /** 719 * Submit a workflow job. 720 * 721 * @param conf job configuration. 722 * @return the job Id. 723 * @throws OozieClientException thrown if the job could not be submitted. 724 */ 725 public String submit(Properties conf) throws OozieClientException { 726 return (new JobSubmit(conf, false)).call(); 727 } 728 729 private class JobAction extends ClientCallable<Void> { 730 731 JobAction(String jobId, String action) { 732 super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, action)); 733 } 734 735 JobAction(String jobId, String action, String params) { 736 super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, action, 737 RestConstants.JOB_CHANGE_VALUE, params)); 738 } 739 740 @Override 741 protected Void call(HttpURLConnection conn) throws IOException, OozieClientException { 742 if (!(conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 743 handleError(conn); 744 } 745 return null; 746 } 747 } 748 749 private class JobsAction extends ClientCallable<JSONObject> { 750 751 JobsAction(String action, String filter, String jobType, int start, int len) { 752 super("PUT", RestConstants.JOBS, "", 753 prepareParams(RestConstants.ACTION_PARAM, action, 754 RestConstants.JOB_FILTER_PARAM, filter, RestConstants.JOBTYPE_PARAM, jobType, 755 RestConstants.OFFSET_PARAM, Integer.toString(start), 756 RestConstants.LEN_PARAM, Integer.toString(len))); 757 } 758 759 @Override 760 protected JSONObject call(HttpURLConnection conn) throws IOException, OozieClientException { 761 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 762 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 763 Reader reader = new InputStreamReader(conn.getInputStream()); 764 JSONObject json = (JSONObject) JSONValue.parse(reader); 765 return json; 766 } 767 else { 768 handleError(conn); 769 } 770 return null; 771 } 772 } 773 /** 774 * Update coord definition. 775 * 776 * @param jobId the job id 777 * @param conf the conf 778 * @param dryrun the dryrun 779 * @param showDiff the show diff 780 * @return the string 781 * @throws OozieClientException the oozie client exception 782 */ 783 public String updateCoord(String jobId, Properties conf, String dryrun, String showDiff) 784 throws OozieClientException { 785 return (new UpdateCoord(jobId, conf, dryrun, showDiff)).call(); 786 } 787 788 /** 789 * Update coord definition without properties. 790 * 791 * @param jobId the job id 792 * @param dryrun the dryrun 793 * @param showDiff the show diff 794 * @return the string 795 * @throws OozieClientException the oozie client exception 796 */ 797 public String updateCoord(String jobId, String dryrun, String showDiff) throws OozieClientException { 798 return (new UpdateCoord(jobId, dryrun, showDiff)).call(); 799 } 800 801 /** 802 * The Class UpdateCoord. 803 */ 804 private class UpdateCoord extends ClientCallable<String> { 805 private final Properties conf; 806 807 public UpdateCoord(String jobId, Properties conf, String jobActionDryrun, String showDiff) { 808 super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, 809 RestConstants.JOB_COORD_UPDATE, RestConstants.JOB_ACTION_DRYRUN, jobActionDryrun, 810 RestConstants.JOB_ACTION_SHOWDIFF, showDiff)); 811 this.conf = conf; 812 } 813 814 public UpdateCoord(String jobId, String jobActionDryrun, String showDiff) { 815 this(jobId, new Properties(), jobActionDryrun, showDiff); 816 } 817 818 @Override 819 protected String call(HttpURLConnection conn) throws IOException, OozieClientException { 820 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 821 writeToXml(conf, conn.getOutputStream()); 822 823 if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) { 824 JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream())); 825 JSONObject update = (JSONObject) json.get(JsonTags.COORD_UPDATE); 826 if (update != null) { 827 return (String) update.get(JsonTags.COORD_UPDATE_DIFF); 828 } 829 else { 830 return ""; 831 } 832 } 833 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { 834 handleError(conn); 835 } 836 return null; 837 } 838 } 839 840 /** 841 * dryrun for a given job 842 * 843 * @param conf Job configuration. 844 */ 845 public String dryrun(Properties conf) throws OozieClientException { 846 return new JobSubmit(conf, RestConstants.JOB_ACTION_DRYRUN).call(); 847 } 848 849 /** 850 * Start a workflow job. 851 * 852 * @param jobId job Id. 853 * @throws OozieClientException thrown if the job could not be started. 854 */ 855 public void start(String jobId) throws OozieClientException { 856 new JobAction(jobId, RestConstants.JOB_ACTION_START).call(); 857 } 858 859 /** 860 * Submit and start a workflow job. 861 * 862 * @param conf job configuration. 863 * @return the job Id. 864 * @throws OozieClientException thrown if the job could not be submitted. 865 */ 866 public String run(Properties conf) throws OozieClientException { 867 return (new JobSubmit(conf, true)).call(); 868 } 869 870 /** 871 * Rerun a workflow job. 872 * 873 * @param jobId job Id to rerun. 874 * @param conf configuration information for the rerun. 875 * @throws OozieClientException thrown if the job could not be started. 876 */ 877 public void reRun(String jobId, Properties conf) throws OozieClientException { 878 new JobSubmit(jobId, conf).call(); 879 } 880 881 /** 882 * Suspend a workflow job. 883 * 884 * @param jobId job Id. 885 * @throws OozieClientException thrown if the job could not be suspended. 886 */ 887 public void suspend(String jobId) throws OozieClientException { 888 new JobAction(jobId, RestConstants.JOB_ACTION_SUSPEND).call(); 889 } 890 891 /** 892 * Resume a workflow job. 893 * 894 * @param jobId job Id. 895 * @throws OozieClientException thrown if the job could not be resume. 896 */ 897 public void resume(String jobId) throws OozieClientException { 898 new JobAction(jobId, RestConstants.JOB_ACTION_RESUME).call(); 899 } 900 901 /** 902 * Kill a workflow/coord/bundle job. 903 * 904 * @param jobId job Id. 905 * @throws OozieClientException thrown if the job could not be killed. 906 */ 907 public void kill(String jobId) throws OozieClientException { 908 new JobAction(jobId, RestConstants.JOB_ACTION_KILL).call(); 909 } 910 911 /** 912 * Kill coordinator actions 913 * @param jobId coordinator Job Id 914 * @param rangeType type 'date' if -date is used, 'action-num' if -action is used 915 * @param scope kill scope for date or action nums 916 * @return list of coordinator actions that underwent kill 917 * @throws OozieClientException thrown if some actions could not be killed. 918 */ 919 public List<CoordinatorAction> kill(String jobId, String rangeType, String scope) throws OozieClientException { 920 return new CoordActionsKill(jobId, rangeType, scope).call(); 921 } 922 923 public JSONObject bulkModifyJobs(String actionType, String filter, String jobType, int start, int len) 924 throws OozieClientException { 925 return new JobsAction(actionType, filter, jobType, start, len).call(); 926 } 927 928 public JSONObject killJobs(String filter, String jobType, int start, int len) 929 throws OozieClientException { 930 return bulkModifyJobs("kill", filter, jobType, start, len); 931 } 932 933 public JSONObject suspendJobs(String filter, String jobType, int start, int len) 934 throws OozieClientException { 935 return bulkModifyJobs("suspend", filter, jobType, start, len); 936 } 937 938 public JSONObject resumeJobs(String filter, String jobType, int start, int len) 939 throws OozieClientException { 940 return bulkModifyJobs("resume", filter, jobType, start, len); 941 } 942 /** 943 * Change a coordinator job. 944 * 945 * @param jobId job Id. 946 * @param changeValue change value. 947 * @throws OozieClientException thrown if the job could not be changed. 948 */ 949 public void change(String jobId, String changeValue) throws OozieClientException { 950 new JobAction(jobId, RestConstants.JOB_ACTION_CHANGE, changeValue).call(); 951 } 952 953 /** 954 * Ignore a coordinator job. 955 * 956 * @param jobId coord job Id. 957 * @param scope list of coord actions to be ignored 958 * @throws OozieClientException thrown if the job could not be changed. 959 */ 960 public List<CoordinatorAction> ignore(String jobId, String scope) throws OozieClientException { 961 return new CoordIgnore(jobId, RestConstants.JOB_COORD_SCOPE_ACTION, scope).call(); 962 } 963 964 private class JobInfo extends ClientCallable<WorkflowJob> { 965 966 JobInfo(String jobId, int start, int len) { 967 super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, 968 RestConstants.JOB_SHOW_INFO, RestConstants.OFFSET_PARAM, Integer.toString(start), 969 RestConstants.LEN_PARAM, Integer.toString(len))); 970 } 971 972 @Override 973 protected WorkflowJob call(HttpURLConnection conn) throws IOException, OozieClientException { 974 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 975 Reader reader = new InputStreamReader(conn.getInputStream()); 976 JSONObject json = (JSONObject) JSONValue.parse(reader); 977 return JsonToBean.createWorkflowJob(json); 978 } 979 else { 980 handleError(conn); 981 } 982 return null; 983 } 984 } 985 986 private class JMSInfo extends ClientCallable<JMSConnectionInfo> { 987 988 JMSInfo() { 989 super("GET", RestConstants.ADMIN, RestConstants.ADMIN_JMS_INFO, prepareParams()); 990 } 991 992 protected JMSConnectionInfo call(HttpURLConnection conn) throws IOException, OozieClientException { 993 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 994 Reader reader = new InputStreamReader(conn.getInputStream()); 995 JSONObject json = (JSONObject) JSONValue.parse(reader); 996 return JsonToBean.createJMSConnectionInfo(json); 997 } 998 else { 999 handleError(conn); 1000 } 1001 return null; 1002 } 1003 } 1004 1005 private class WorkflowActionInfo extends ClientCallable<WorkflowAction> { 1006 WorkflowActionInfo(String actionId) { 1007 super("GET", RestConstants.JOB, notEmpty(actionId, "id"), prepareParams(RestConstants.JOB_SHOW_PARAM, 1008 RestConstants.JOB_SHOW_INFO)); 1009 } 1010 1011 @Override 1012 protected WorkflowAction call(HttpURLConnection conn) throws IOException, OozieClientException { 1013 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1014 Reader reader = new InputStreamReader(conn.getInputStream()); 1015 JSONObject json = (JSONObject) JSONValue.parse(reader); 1016 return JsonToBean.createWorkflowAction(json); 1017 } 1018 else { 1019 handleError(conn); 1020 } 1021 return null; 1022 } 1023 } 1024 1025 /** 1026 * Get the info of a workflow job. 1027 * 1028 * @param jobId job Id. 1029 * @return the job info. 1030 * @throws OozieClientException thrown if the job info could not be retrieved. 1031 */ 1032 public WorkflowJob getJobInfo(String jobId) throws OozieClientException { 1033 return getJobInfo(jobId, 0, 0); 1034 } 1035 1036 /** 1037 * Get the JMS Connection info 1038 * @return JMSConnectionInfo object 1039 * @throws OozieClientException 1040 */ 1041 public JMSConnectionInfo getJMSConnectionInfo() throws OozieClientException { 1042 return new JMSInfo().call(); 1043 } 1044 1045 /** 1046 * Get the info of a workflow job and subset actions. 1047 * 1048 * @param jobId job Id. 1049 * @param start starting index in the list of actions belonging to the job 1050 * @param len number of actions to be returned 1051 * @return the job info. 1052 * @throws OozieClientException thrown if the job info could not be retrieved. 1053 */ 1054 public WorkflowJob getJobInfo(String jobId, int start, int len) throws OozieClientException { 1055 return new JobInfo(jobId, start, len).call(); 1056 } 1057 1058 /** 1059 * Get the info of a workflow action. 1060 * 1061 * @param actionId Id. 1062 * @return the workflow action info. 1063 * @throws OozieClientException thrown if the job info could not be retrieved. 1064 */ 1065 public WorkflowAction getWorkflowActionInfo(String actionId) throws OozieClientException { 1066 return new WorkflowActionInfo(actionId).call(); 1067 } 1068 1069 /** 1070 * Get the log of a workflow job. 1071 * 1072 * @param jobId job Id. 1073 * @return the job log. 1074 * @throws OozieClientException thrown if the job info could not be retrieved. 1075 */ 1076 public String getJobLog(String jobId) throws OozieClientException { 1077 return new JobLog(jobId).call(); 1078 } 1079 1080 /** 1081 * Get the audit log of a job. 1082 * 1083 * @param jobId 1084 * @param ps 1085 * @throws OozieClientException 1086 */ 1087 public void getJobAuditLog(String jobId, PrintStream ps) throws OozieClientException { 1088 new JobAuditLog(jobId, ps).call(); 1089 } 1090 1091 /** 1092 * Get the log of a job. 1093 * 1094 * @param jobId job Id. 1095 * @param logRetrievalType Based on which filter criteria the log is retrieved 1096 * @param logRetrievalScope Value for the retrieval type 1097 * @param logFilter log filter 1098 * @param ps Printstream of command line interface 1099 * @throws OozieClientException thrown if the job info could not be retrieved. 1100 */ 1101 public void getJobLog(String jobId, String logRetrievalType, String logRetrievalScope, String logFilter, 1102 PrintStream ps) throws OozieClientException { 1103 new JobLog(jobId, logRetrievalType, logRetrievalScope, logFilter, ps).call(); 1104 } 1105 1106 /** 1107 * Get the error log of a job. 1108 * 1109 * @param jobId 1110 * @param ps 1111 * @throws OozieClientException 1112 */ 1113 public void getJobErrorLog(String jobId, PrintStream ps) throws OozieClientException { 1114 new JobErrorLog(jobId, ps).call(); 1115 } 1116 1117 /** 1118 * Get the log of a job. 1119 * 1120 * @param jobId job Id. 1121 * @param logRetrievalType Based on which filter criteria the log is retrieved 1122 * @param logRetrievalScope Value for the retrieval type 1123 * @param ps Printstream of command line interface 1124 * @throws OozieClientException thrown if the job info could not be retrieved. 1125 */ 1126 public void getJobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps) 1127 throws OozieClientException { 1128 getJobLog(jobId, logRetrievalType, logRetrievalScope, null, ps); 1129 } 1130 1131 private class JobLog extends JobMetadata { 1132 JobLog(String jobId) { 1133 super(jobId, RestConstants.JOB_SHOW_LOG); 1134 } 1135 JobLog(String jobId, String logRetrievalType, String logRetrievalScope, String logFilter, PrintStream ps) { 1136 super(jobId, logRetrievalType, logRetrievalScope, RestConstants.JOB_SHOW_LOG, logFilter, ps); 1137 } 1138 } 1139 1140 private class JobErrorLog extends JobMetadata { 1141 JobErrorLog(String jobId, PrintStream ps) { 1142 super(jobId, RestConstants.JOB_SHOW_ERROR_LOG, ps); 1143 } 1144 } 1145 1146 private class JobAuditLog extends JobMetadata { 1147 JobAuditLog(String jobId, PrintStream ps) { 1148 super(jobId, RestConstants.JOB_SHOW_AUDIT_LOG, ps); 1149 } 1150 } 1151 1152 1153 /** 1154 * Gets the JMS topic name for a particular job 1155 * @param jobId given jobId 1156 * @return the JMS topic name 1157 * @throws OozieClientException 1158 */ 1159 public String getJMSTopicName(String jobId) throws OozieClientException { 1160 return new JMSTopic(jobId).call(); 1161 } 1162 1163 private class JMSTopic extends ClientCallable<String> { 1164 1165 JMSTopic(String jobId) { 1166 super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, 1167 RestConstants.JOB_SHOW_JMS_TOPIC)); 1168 } 1169 1170 protected String call(HttpURLConnection conn) throws IOException, OozieClientException { 1171 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1172 Reader reader = new InputStreamReader(conn.getInputStream()); 1173 JSONObject json = (JSONObject) JSONValue.parse(reader); 1174 return (String) json.get(JsonTags.JMS_TOPIC_NAME); 1175 } 1176 else { 1177 handleError(conn); 1178 } 1179 return null; 1180 } 1181 } 1182 1183 /** 1184 * Get the definition of a workflow job. 1185 * 1186 * @param jobId job Id. 1187 * @return the job log. 1188 * @throws OozieClientException thrown if the job info could not be retrieved. 1189 */ 1190 public String getJobDefinition(String jobId) throws OozieClientException { 1191 return new JobDefinition(jobId).call(); 1192 } 1193 1194 private class JobDefinition extends JobMetadata { 1195 1196 JobDefinition(String jobId) { 1197 super(jobId, RestConstants.JOB_SHOW_DEFINITION); 1198 } 1199 } 1200 1201 private class JobMetadata extends ClientCallable<String> { 1202 PrintStream printStream; 1203 1204 JobMetadata(String jobId, String metaType) { 1205 super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, 1206 metaType)); 1207 } 1208 1209 JobMetadata(String jobId, String metaType, PrintStream ps) { 1210 this(jobId, metaType); 1211 printStream = ps; 1212 1213 } 1214 1215 JobMetadata(String jobId, String logRetrievalType, String logRetrievalScope, String metaType, String logFilter, 1216 PrintStream ps) { 1217 super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, 1218 metaType, RestConstants.JOB_LOG_TYPE_PARAM, logRetrievalType, RestConstants.JOB_LOG_SCOPE_PARAM, 1219 logRetrievalScope, RestConstants.LOG_FILTER_OPTION, logFilter)); 1220 printStream = ps; 1221 } 1222 1223 @Override 1224 protected String call(HttpURLConnection conn) throws IOException, OozieClientException { 1225 String returnVal = null; 1226 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1227 InputStream is = conn.getInputStream(); 1228 InputStreamReader isr = new InputStreamReader(is); 1229 try { 1230 if (printStream != null) { 1231 sendToOutputStream(isr, -1); 1232 } 1233 else { 1234 returnVal = getReaderAsString(isr, -1); 1235 } 1236 } 1237 finally { 1238 isr.close(); 1239 } 1240 } 1241 else { 1242 handleError(conn); 1243 } 1244 return returnVal; 1245 } 1246 1247 /** 1248 * Output the log to command line interface 1249 * 1250 * @param reader reader to read into a string. 1251 * @param maxLen max content length allowed, if -1 there is no limit. 1252 * @throws IOException 1253 */ 1254 private void sendToOutputStream(Reader reader, int maxLen) throws IOException { 1255 notNull(reader, "reader"); 1256 StringBuilder sb = new StringBuilder(); 1257 char[] buffer = new char[2048]; 1258 int read; 1259 int count = 0; 1260 int noOfCharstoFlush = 1024; 1261 while ((read = reader.read(buffer)) > -1) { 1262 count += read; 1263 if ((maxLen > -1) && (count > maxLen)) { 1264 break; 1265 } 1266 sb.append(buffer, 0, read); 1267 if (sb.length() > noOfCharstoFlush) { 1268 printStream.print(sb.toString()); 1269 sb = new StringBuilder(""); 1270 } 1271 } 1272 printStream.print(sb.toString()); 1273 } 1274 1275 /** 1276 * Return a reader as string. 1277 * <p> 1278 * 1279 * @param reader reader to read into a string. 1280 * @param maxLen max content length allowed, if -1 there is no limit. 1281 * @return the reader content. 1282 * @throws IOException thrown if the resource could not be read. 1283 */ 1284 private String getReaderAsString(Reader reader, int maxLen) throws IOException { 1285 notNull(reader, "reader"); 1286 StringBuffer sb = new StringBuffer(); 1287 char[] buffer = new char[2048]; 1288 int read; 1289 int count = 0; 1290 while ((read = reader.read(buffer)) > -1) { 1291 count += read; 1292 1293 // read up to maxLen chars; 1294 if ((maxLen > -1) && (count > maxLen)) { 1295 break; 1296 } 1297 sb.append(buffer, 0, read); 1298 } 1299 reader.close(); 1300 return sb.toString(); 1301 } 1302 } 1303 1304 private class CoordJobInfo extends ClientCallable<CoordinatorJob> { 1305 1306 CoordJobInfo(String jobId, String filter, int start, int len, String order) { 1307 super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, 1308 RestConstants.JOB_SHOW_INFO, RestConstants.JOB_FILTER_PARAM, filter, RestConstants.OFFSET_PARAM, 1309 Integer.toString(start), RestConstants.LEN_PARAM, Integer.toString(len), RestConstants.ORDER_PARAM, 1310 order)); 1311 } 1312 1313 @Override 1314 protected CoordinatorJob call(HttpURLConnection conn) throws IOException, OozieClientException { 1315 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1316 Reader reader = new InputStreamReader(conn.getInputStream()); 1317 JSONObject json = (JSONObject) JSONValue.parse(reader); 1318 return JsonToBean.createCoordinatorJob(json); 1319 } 1320 else { 1321 handleError(conn); 1322 } 1323 return null; 1324 } 1325 } 1326 1327 private class WfsForCoordAction extends ClientCallable<List<WorkflowJob>> { 1328 1329 WfsForCoordAction(String coordActionId) { 1330 super("GET", RestConstants.JOB, notEmpty(coordActionId, "coordActionId"), prepareParams( 1331 RestConstants.JOB_SHOW_PARAM, RestConstants.ALL_WORKFLOWS_FOR_COORD_ACTION)); 1332 } 1333 1334 @Override 1335 protected List<WorkflowJob> call(HttpURLConnection conn) throws IOException, OozieClientException { 1336 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1337 Reader reader = new InputStreamReader(conn.getInputStream()); 1338 JSONObject json = (JSONObject) JSONValue.parse(reader); 1339 JSONArray workflows = (JSONArray) json.get(JsonTags.WORKFLOWS_JOBS); 1340 if (workflows == null) { 1341 workflows = new JSONArray(); 1342 } 1343 return JsonToBean.createWorkflowJobList(workflows); 1344 } 1345 else { 1346 handleError(conn); 1347 } 1348 return null; 1349 } 1350 } 1351 1352 1353 private class BundleJobInfo extends ClientCallable<BundleJob> { 1354 1355 BundleJobInfo(String jobId) { 1356 super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, 1357 RestConstants.JOB_SHOW_INFO)); 1358 } 1359 1360 @Override 1361 protected BundleJob call(HttpURLConnection conn) throws IOException, OozieClientException { 1362 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1363 Reader reader = new InputStreamReader(conn.getInputStream()); 1364 JSONObject json = (JSONObject) JSONValue.parse(reader); 1365 return JsonToBean.createBundleJob(json); 1366 } 1367 else { 1368 handleError(conn); 1369 } 1370 return null; 1371 } 1372 } 1373 1374 private class CoordActionInfo extends ClientCallable<CoordinatorAction> { 1375 CoordActionInfo(String actionId) { 1376 super("GET", RestConstants.JOB, notEmpty(actionId, "id"), prepareParams(RestConstants.JOB_SHOW_PARAM, 1377 RestConstants.JOB_SHOW_INFO)); 1378 } 1379 1380 @Override 1381 protected CoordinatorAction call(HttpURLConnection conn) throws IOException, OozieClientException { 1382 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1383 Reader reader = new InputStreamReader(conn.getInputStream()); 1384 JSONObject json = (JSONObject) JSONValue.parse(reader); 1385 return JsonToBean.createCoordinatorAction(json); 1386 } 1387 else { 1388 handleError(conn); 1389 } 1390 return null; 1391 } 1392 } 1393 1394 /** 1395 * Get the info of a bundle job. 1396 * 1397 * @param jobId job Id. 1398 * @return the job info. 1399 * @throws OozieClientException thrown if the job info could not be retrieved. 1400 */ 1401 public BundleJob getBundleJobInfo(String jobId) throws OozieClientException { 1402 return new BundleJobInfo(jobId).call(); 1403 } 1404 1405 /** 1406 * Get the info of a coordinator job. 1407 * 1408 * @param jobId job Id. 1409 * @return the job info. 1410 * @throws OozieClientException thrown if the job info could not be retrieved. 1411 */ 1412 public CoordinatorJob getCoordJobInfo(String jobId) throws OozieClientException { 1413 return new CoordJobInfo(jobId, null, -1, -1, "asc").call(); 1414 } 1415 1416 /** 1417 * Get the info of a coordinator job and subset actions. 1418 * 1419 * @param jobId job Id. 1420 * @param filter filter the status filter 1421 * @param start starting index in the list of actions belonging to the job 1422 * @param len number of actions to be returned 1423 * @return the job info. 1424 * @throws OozieClientException thrown if the job info could not be retrieved. 1425 */ 1426 public CoordinatorJob getCoordJobInfo(String jobId, String filter, int start, int len) 1427 throws OozieClientException { 1428 return new CoordJobInfo(jobId, filter, start, len, "asc").call(); 1429 } 1430 1431 /** 1432 * Get the info of a coordinator job and subset actions. 1433 * 1434 * @param jobId job Id. 1435 * @param filter filter the status filter 1436 * @param start starting index in the list of actions belonging to the job 1437 * @param len number of actions to be returned 1438 * @param order order to list coord actions (e.g, desc) 1439 * @return the job info. 1440 * @throws OozieClientException thrown if the job info could not be retrieved. 1441 */ 1442 public CoordinatorJob getCoordJobInfo(String jobId, String filter, int start, int len, String order) 1443 throws OozieClientException { 1444 return new CoordJobInfo(jobId, filter, start, len, order).call(); 1445 } 1446 1447 public List<WorkflowJob> getWfsForCoordAction(String coordActionId) throws OozieClientException { 1448 return new WfsForCoordAction(coordActionId).call(); 1449 } 1450 1451 /** 1452 * Get the info of a coordinator action. 1453 * 1454 * @param actionId Id. 1455 * @return the coordinator action info. 1456 * @throws OozieClientException thrown if the job info could not be retrieved. 1457 */ 1458 public CoordinatorAction getCoordActionInfo(String actionId) throws OozieClientException { 1459 return new CoordActionInfo(actionId).call(); 1460 } 1461 1462 private class JobsStatus extends ClientCallable<List<WorkflowJob>> { 1463 1464 JobsStatus(String filter, int start, int len) { 1465 super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter, 1466 RestConstants.JOBTYPE_PARAM, "wf", RestConstants.OFFSET_PARAM, Integer.toString(start), 1467 RestConstants.LEN_PARAM, Integer.toString(len))); 1468 } 1469 1470 @Override 1471 protected List<WorkflowJob> call(HttpURLConnection conn) throws IOException, OozieClientException { 1472 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 1473 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1474 Reader reader = new InputStreamReader(conn.getInputStream()); 1475 JSONObject json = (JSONObject) JSONValue.parse(reader); 1476 JSONArray workflows = (JSONArray) json.get(JsonTags.WORKFLOWS_JOBS); 1477 if (workflows == null) { 1478 workflows = new JSONArray(); 1479 } 1480 return JsonToBean.createWorkflowJobList(workflows); 1481 } 1482 else { 1483 handleError(conn); 1484 } 1485 return null; 1486 } 1487 } 1488 1489 private class CoordJobsStatus extends ClientCallable<List<CoordinatorJob>> { 1490 1491 CoordJobsStatus(String filter, int start, int len) { 1492 super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter, 1493 RestConstants.JOBTYPE_PARAM, "coord", RestConstants.OFFSET_PARAM, Integer.toString(start), 1494 RestConstants.LEN_PARAM, Integer.toString(len))); 1495 } 1496 1497 @Override 1498 protected List<CoordinatorJob> call(HttpURLConnection conn) throws IOException, OozieClientException { 1499 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 1500 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1501 Reader reader = new InputStreamReader(conn.getInputStream()); 1502 JSONObject json = (JSONObject) JSONValue.parse(reader); 1503 JSONArray jobs = (JSONArray) json.get(JsonTags.COORDINATOR_JOBS); 1504 if (jobs == null) { 1505 jobs = new JSONArray(); 1506 } 1507 return JsonToBean.createCoordinatorJobList(jobs); 1508 } 1509 else { 1510 handleError(conn); 1511 } 1512 return null; 1513 } 1514 } 1515 1516 private class BundleJobsStatus extends ClientCallable<List<BundleJob>> { 1517 1518 BundleJobsStatus(String filter, int start, int len) { 1519 super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter, 1520 RestConstants.JOBTYPE_PARAM, "bundle", RestConstants.OFFSET_PARAM, Integer.toString(start), 1521 RestConstants.LEN_PARAM, Integer.toString(len))); 1522 } 1523 1524 @Override 1525 protected List<BundleJob> call(HttpURLConnection conn) throws IOException, OozieClientException { 1526 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 1527 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1528 Reader reader = new InputStreamReader(conn.getInputStream()); 1529 JSONObject json = (JSONObject) JSONValue.parse(reader); 1530 JSONArray jobs = (JSONArray) json.get(JsonTags.BUNDLE_JOBS); 1531 if (jobs == null) { 1532 jobs = new JSONArray(); 1533 } 1534 return JsonToBean.createBundleJobList(jobs); 1535 } 1536 else { 1537 handleError(conn); 1538 } 1539 return null; 1540 } 1541 } 1542 1543 private class BulkResponseStatus extends ClientCallable<List<BulkResponse>> { 1544 1545 BulkResponseStatus(String filter, int start, int len) { 1546 super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_BULK_PARAM, filter, 1547 RestConstants.OFFSET_PARAM, Integer.toString(start), RestConstants.LEN_PARAM, Integer.toString(len))); 1548 } 1549 1550 @Override 1551 protected List<BulkResponse> call(HttpURLConnection conn) throws IOException, OozieClientException { 1552 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 1553 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1554 Reader reader = new InputStreamReader(conn.getInputStream()); 1555 JSONObject json = (JSONObject) JSONValue.parse(reader); 1556 JSONArray results = (JSONArray) json.get(JsonTags.BULK_RESPONSES); 1557 if (results == null) { 1558 results = new JSONArray(); 1559 } 1560 return JsonToBean.createBulkResponseList(results); 1561 } 1562 else { 1563 handleError(conn); 1564 } 1565 return null; 1566 } 1567 } 1568 1569 private class CoordActionsKill extends ClientCallable<List<CoordinatorAction>> { 1570 1571 CoordActionsKill(String jobId, String rangeType, String scope) { 1572 super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, 1573 RestConstants.JOB_ACTION_KILL, RestConstants.JOB_COORD_RANGE_TYPE_PARAM, rangeType, 1574 RestConstants.JOB_COORD_SCOPE_PARAM, scope)); 1575 } 1576 1577 @Override 1578 protected List<CoordinatorAction> call(HttpURLConnection conn) throws IOException, OozieClientException { 1579 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 1580 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1581 Reader reader = new InputStreamReader(conn.getInputStream()); 1582 JSONObject json = (JSONObject) JSONValue.parse(reader); 1583 JSONArray coordActions = (JSONArray) json.get(JsonTags.COORDINATOR_ACTIONS); 1584 return JsonToBean.createCoordinatorActionList(coordActions); 1585 } 1586 else { 1587 handleError(conn); 1588 } 1589 return null; 1590 } 1591 } 1592 1593 private class CoordIgnore extends ClientCallable<List<CoordinatorAction>> { 1594 CoordIgnore(String jobId, String rerunType, String scope) { 1595 super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, 1596 RestConstants.JOB_ACTION_IGNORE, RestConstants.JOB_COORD_RANGE_TYPE_PARAM, 1597 rerunType, RestConstants.JOB_COORD_SCOPE_PARAM, scope)); 1598 } 1599 1600 @Override 1601 protected List<CoordinatorAction> call(HttpURLConnection conn) throws IOException, OozieClientException { 1602 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 1603 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1604 Reader reader = new InputStreamReader(conn.getInputStream()); 1605 JSONObject json = (JSONObject) JSONValue.parse(reader); 1606 if(json != null) { 1607 JSONArray coordActions = (JSONArray) json.get(JsonTags.COORDINATOR_ACTIONS); 1608 return JsonToBean.createCoordinatorActionList(coordActions); 1609 } 1610 } 1611 else { 1612 handleError(conn); 1613 } 1614 return null; 1615 } 1616 } 1617 private class CoordRerun extends ClientCallable<List<CoordinatorAction>> { 1618 private final Properties conf; 1619 1620 CoordRerun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup, boolean failed, 1621 Properties conf) { 1622 super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, 1623 RestConstants.JOB_COORD_ACTION_RERUN, RestConstants.JOB_COORD_RANGE_TYPE_PARAM, rerunType, 1624 RestConstants.JOB_COORD_SCOPE_PARAM, scope, RestConstants.JOB_COORD_RERUN_REFRESH_PARAM, 1625 Boolean.toString(refresh), RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM, Boolean 1626 .toString(noCleanup), RestConstants.JOB_COORD_RERUN_FAILED_PARAM, Boolean.toString(failed))); 1627 this.conf = conf; 1628 } 1629 1630 @Override 1631 protected List<CoordinatorAction> call(HttpURLConnection conn) throws IOException, OozieClientException { 1632 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 1633 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1634 Reader reader = new InputStreamReader(conn.getInputStream()); 1635 JSONObject json = (JSONObject) JSONValue.parse(reader); 1636 JSONArray coordActions = (JSONArray) json.get(JsonTags.COORDINATOR_ACTIONS); 1637 return JsonToBean.createCoordinatorActionList(coordActions); 1638 } 1639 else { 1640 handleError(conn); 1641 } 1642 return null; 1643 } 1644 } 1645 1646 private class BundleRerun extends ClientCallable<Void> { 1647 1648 BundleRerun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) { 1649 super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, 1650 RestConstants.JOB_BUNDLE_ACTION_RERUN, RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM, 1651 coordScope, RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM, dateScope, 1652 RestConstants.JOB_COORD_RERUN_REFRESH_PARAM, Boolean.toString(refresh), 1653 RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM, Boolean.toString(noCleanup))); 1654 } 1655 1656 @Override 1657 protected Void call(HttpURLConnection conn) throws IOException, OozieClientException { 1658 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 1659 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1660 return null; 1661 } 1662 else { 1663 handleError(conn); 1664 } 1665 return null; 1666 } 1667 } 1668 1669 /** 1670 * Rerun coordinator actions. 1671 * 1672 * @param jobId coordinator jobId 1673 * @param rerunType rerun type 'date' if -date is used, 'action-id' if -action is used 1674 * @param scope rerun scope for date or actionIds 1675 * @param refresh true if -refresh is given in command option 1676 * @param noCleanup true if -nocleanup is given in command option 1677 * @throws OozieClientException 1678 */ 1679 public List<CoordinatorAction> reRunCoord(String jobId, String rerunType, String scope, boolean refresh, 1680 boolean noCleanup) throws OozieClientException { 1681 return new CoordRerun(jobId, rerunType, scope, refresh, noCleanup, false, null).call(); 1682 } 1683 1684 /** 1685 * Rerun coordinator actions with failed option. 1686 * 1687 * @param jobId coordinator jobId 1688 * @param rerunType rerun type 'date' if -date is used, 'action-id' if -action is used 1689 * @param scope rerun scope for date or actionIds 1690 * @param refresh true if -refresh is given in command option 1691 * @param noCleanup true if -nocleanup is given in command option 1692 * @param failed true if -failed is given in command option 1693 * @throws OozieClientException 1694 */ 1695 public List<CoordinatorAction> reRunCoord(String jobId, String rerunType, String scope, boolean refresh, 1696 boolean noCleanup, boolean failed, Properties props) throws OozieClientException { 1697 return new CoordRerun(jobId, rerunType, scope, refresh, noCleanup, failed, props).call(); 1698 } 1699 1700 /** 1701 * Rerun bundle coordinators. 1702 * 1703 * @param jobId bundle jobId 1704 * @param coordScope rerun scope for coordinator jobs 1705 * @param dateScope rerun scope for date 1706 * @param refresh true if -refresh is given in command option 1707 * @param noCleanup true if -nocleanup is given in command option 1708 * @throws OozieClientException 1709 */ 1710 public Void reRunBundle(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) 1711 throws OozieClientException { 1712 return new BundleRerun(jobId, coordScope, dateScope, refresh, noCleanup).call(); 1713 } 1714 1715 /** 1716 * Return the info of the workflow jobs that match the filter. 1717 * 1718 * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax. 1719 * @param start jobs offset, base 1. 1720 * @param len number of jobs to return. 1721 * @return a list with the workflow jobs info, without node details. 1722 * @throws OozieClientException thrown if the jobs info could not be retrieved. 1723 */ 1724 public List<WorkflowJob> getJobsInfo(String filter, int start, int len) throws OozieClientException { 1725 return new JobsStatus(filter, start, len).call(); 1726 } 1727 1728 /** 1729 * Return the info of the workflow jobs that match the filter. 1730 * <p> 1731 * It returns the first 100 jobs that match the filter. 1732 * 1733 * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax. 1734 * @return a list with the workflow jobs info, without node details. 1735 * @throws OozieClientException thrown if the jobs info could not be retrieved. 1736 */ 1737 public List<WorkflowJob> getJobsInfo(String filter) throws OozieClientException { 1738 return getJobsInfo(filter, 1, 50); 1739 } 1740 1741 /** 1742 * Sla enable alert. 1743 * 1744 * @param jobIds the job ids 1745 * @param actions comma separated list of action ids or action id ranges 1746 * @param dates comma separated list of the nominal times 1747 * @throws OozieClientException the oozie client exception 1748 */ 1749 public void slaEnableAlert(String jobIds, String actions, String dates) throws OozieClientException { 1750 new UpdateSLA(RestConstants.SLA_ENABLE_ALERT, jobIds, actions, dates, null).call(); 1751 } 1752 1753 /** 1754 * Sla enable alert for bundle with coord name/id. 1755 * 1756 * @param bundleId the bundle id 1757 * @param actions comma separated list of action ids or action id ranges 1758 * @param dates comma separated list of the nominal times 1759 * @param coords the coordinators 1760 * @throws OozieClientException the oozie client exception 1761 */ 1762 public void slaEnableAlert(String bundleId, String actions, String dates, String coords) 1763 throws OozieClientException { 1764 new UpdateSLA(RestConstants.SLA_ENABLE_ALERT, bundleId, actions, dates, coords).call(); 1765 } 1766 1767 /** 1768 * Sla disable alert. 1769 * 1770 * @param jobIds the job ids 1771 * @param actions comma separated list of action ids or action id ranges 1772 * @param dates comma separated list of the nominal times 1773 * @throws OozieClientException the oozie client exception 1774 */ 1775 public void slaDisableAlert(String jobIds, String actions, String dates) throws OozieClientException { 1776 new UpdateSLA(RestConstants.SLA_DISABLE_ALERT, jobIds, actions, dates, null).call(); 1777 } 1778 1779 /** 1780 * Sla disable alert for bundle with coord name/id. 1781 * 1782 * @param bundleId the bundle id 1783 * @param actions comma separated list of action ids or action id ranges 1784 * @param dates comma separated list of the nominal times 1785 * @param coords the coordinators 1786 * @throws OozieClientException the oozie client exception 1787 */ 1788 public void slaDisableAlert(String bundleId, String actions, String dates, String coords) 1789 throws OozieClientException { 1790 new UpdateSLA(RestConstants.SLA_DISABLE_ALERT, bundleId, actions, dates, coords).call(); 1791 } 1792 1793 /** 1794 * Sla change definations. 1795 * SLA change definition parameters can be [<key>=<value>,...<key>=<value>] 1796 * Supported parameter key names are should-start, should-end and max-duration 1797 * @param jobIds the job ids 1798 * @param actions comma separated list of action ids or action id ranges. 1799 * @param dates comma separated list of the nominal times 1800 * @param newSlaParams the new sla params 1801 * @throws OozieClientException the oozie client exception 1802 */ 1803 public void slaChange(String jobIds, String actions, String dates, String newSlaParams) throws OozieClientException { 1804 new UpdateSLA(RestConstants.SLA_CHANGE, jobIds, actions, dates, null, newSlaParams).call(); 1805 } 1806 1807 /** 1808 * Sla change defination for bundle with coord name/id. 1809 * SLA change definition parameters can be [<key>=<value>,...<key>=<value>] 1810 * Supported parameter key names are should-start, should-end and max-duration 1811 * @param bundleId the bundle id 1812 * @param actions comma separated list of action ids or action id ranges 1813 * @param dates comma separated list of the nominal times 1814 * @param coords the coords 1815 * @param newSlaParams the new sla params 1816 * @throws OozieClientException the oozie client exception 1817 */ 1818 public void slaChange(String bundleId, String actions, String dates, String coords, String newSlaParams) 1819 throws OozieClientException { 1820 new UpdateSLA(RestConstants.SLA_CHANGE, bundleId, actions, dates, coords, newSlaParams).call(); 1821 } 1822 1823 /** 1824 * Sla change with new sla param as hasmap. 1825 * Supported parameter key names are should-start, should-end and max-duration 1826 * @param bundleId the bundle id 1827 * @param actions comma separated list of action ids or action id ranges 1828 * @param dates comma separated list of the nominal times 1829 * @param coords the coords 1830 * @param newSlaParams the new sla params 1831 * @throws OozieClientException the oozie client exception 1832 */ 1833 public void slaChange(String bundleId, String actions, String dates, String coords, Map<String, String> newSlaParams) 1834 throws OozieClientException { 1835 new UpdateSLA(RestConstants.SLA_CHANGE, bundleId, actions, dates, coords, mapToString(newSlaParams)).call(); 1836 } 1837 1838 /** 1839 * Convert Map to string. 1840 * 1841 * @param map the map 1842 * @return the string 1843 */ 1844 private String mapToString(Map<String, String> map) { 1845 StringBuilder sb = new StringBuilder(); 1846 Iterator<Entry<String, String>> it = map.entrySet().iterator(); 1847 while (it.hasNext()) { 1848 Entry<String, String> e = (Entry<String, String>) it.next(); 1849 sb.append(e.getKey()).append("=").append(e.getValue()).append(";"); 1850 } 1851 return sb.toString(); 1852 } 1853 1854 private class UpdateSLA extends ClientCallable<Void> { 1855 1856 UpdateSLA(String action, String jobIds, String coordActions, String dates, String coords) { 1857 super("PUT", RestConstants.JOB, notEmpty(jobIds, "jobIds"), prepareParams(RestConstants.ACTION_PARAM, 1858 action, RestConstants.JOB_COORD_SCOPE_ACTION_LIST, coordActions, RestConstants.JOB_COORD_SCOPE_DATE, 1859 dates, RestConstants.COORDINATORS_PARAM, coords)); 1860 } 1861 1862 UpdateSLA(String action, String jobIds, String coordActions, String dates, String coords, String newSlaParams) { 1863 super("PUT", RestConstants.JOB, notEmpty(jobIds, "jobIds"), prepareParams(RestConstants.ACTION_PARAM, 1864 action, RestConstants.JOB_COORD_SCOPE_ACTION_LIST, coordActions, RestConstants.JOB_COORD_SCOPE_DATE, 1865 dates, RestConstants.COORDINATORS_PARAM, coords, RestConstants.JOB_CHANGE_VALUE, newSlaParams)); 1866 } 1867 1868 @Override 1869 protected Void call(HttpURLConnection conn) throws IOException, OozieClientException { 1870 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 1871 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1872 System.out.println("Done"); 1873 } 1874 else { 1875 handleError(conn); 1876 } 1877 return null; 1878 } 1879 } 1880 1881 /** 1882 * Print sla info about coordinator and workflow jobs and actions. 1883 * 1884 * @param start starting offset 1885 * @param len number of results 1886 * @throws OozieClientException 1887 */ 1888 public void getSlaInfo(int start, int len, String filter) throws OozieClientException { 1889 new SlaInfo(start, len, filter).call(); 1890 } 1891 1892 private class SlaInfo extends ClientCallable<Void> { 1893 1894 SlaInfo(int start, int len, String filter) { 1895 super("GET", WS_PROTOCOL_VERSION_1, RestConstants.SLA, "", prepareParams(RestConstants.SLA_GT_SEQUENCE_ID, 1896 Integer.toString(start), RestConstants.MAX_EVENTS, Integer.toString(len), 1897 RestConstants.JOBS_FILTER_PARAM, filter)); 1898 } 1899 1900 @Override 1901 protected Void call(HttpURLConnection conn) throws IOException, OozieClientException { 1902 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 1903 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1904 BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream())); 1905 String line = null; 1906 while ((line = br.readLine()) != null) { 1907 System.out.println(line); 1908 } 1909 } 1910 else { 1911 handleError(conn); 1912 } 1913 return null; 1914 } 1915 } 1916 1917 private class JobIdAction extends ClientCallable<String> { 1918 1919 JobIdAction(String externalId) { 1920 super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, "wf", 1921 RestConstants.JOBS_EXTERNAL_ID_PARAM, externalId)); 1922 } 1923 1924 @Override 1925 protected String call(HttpURLConnection conn) throws IOException, OozieClientException { 1926 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1927 Reader reader = new InputStreamReader(conn.getInputStream()); 1928 JSONObject json = (JSONObject) JSONValue.parse(reader); 1929 return (String) json.get(JsonTags.JOB_ID); 1930 } 1931 else { 1932 handleError(conn); 1933 } 1934 return null; 1935 } 1936 } 1937 1938 /** 1939 * Return the workflow job Id for an external Id. 1940 * <p> 1941 * The external Id must have provided at job creation time. 1942 * 1943 * @param externalId external Id given at job creation time. 1944 * @return the workflow job Id for an external Id, <code>null</code> if none. 1945 * @throws OozieClientException thrown if the operation could not be done. 1946 */ 1947 public String getJobId(String externalId) throws OozieClientException { 1948 return new JobIdAction(externalId).call(); 1949 } 1950 1951 private class SetSystemMode extends ClientCallable<Void> { 1952 1953 public SetSystemMode(SYSTEM_MODE status) { 1954 super("PUT", RestConstants.ADMIN, RestConstants.ADMIN_STATUS_RESOURCE, prepareParams( 1955 RestConstants.ADMIN_SYSTEM_MODE_PARAM, status + "")); 1956 } 1957 1958 @Override 1959 public Void call(HttpURLConnection conn) throws IOException, OozieClientException { 1960 if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { 1961 handleError(conn); 1962 } 1963 return null; 1964 } 1965 } 1966 1967 /** 1968 * Enable or disable safe mode. Used by OozieCLI. In safe mode, Oozie would not accept any commands except status 1969 * command to change and view the safe mode status. 1970 * 1971 * @param status true to enable safe mode, false to disable safe mode. 1972 * @throws OozieClientException if it fails to set the safe mode status. 1973 */ 1974 public void setSystemMode(SYSTEM_MODE status) throws OozieClientException { 1975 new SetSystemMode(status).call(); 1976 } 1977 1978 private class GetSystemMode extends ClientCallable<SYSTEM_MODE> { 1979 1980 GetSystemMode() { 1981 super("GET", RestConstants.ADMIN, RestConstants.ADMIN_STATUS_RESOURCE, prepareParams()); 1982 } 1983 1984 @Override 1985 protected SYSTEM_MODE call(HttpURLConnection conn) throws IOException, OozieClientException { 1986 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 1987 Reader reader = new InputStreamReader(conn.getInputStream()); 1988 JSONObject json = (JSONObject) JSONValue.parse(reader); 1989 return SYSTEM_MODE.valueOf((String) json.get(JsonTags.OOZIE_SYSTEM_MODE)); 1990 } 1991 else { 1992 handleError(conn); 1993 } 1994 return SYSTEM_MODE.NORMAL; 1995 } 1996 } 1997 1998 /** 1999 * Returns if Oozie is in safe mode or not. 2000 * 2001 * @return true if safe mode is ON<br> 2002 * false if safe mode is OFF 2003 * @throws OozieClientException throw if it could not obtain the safe mode status. 2004 */ 2005 /* 2006 * public boolean isInSafeMode() throws OozieClientException { return new GetSafeMode().call(); } 2007 */ 2008 public SYSTEM_MODE getSystemMode() throws OozieClientException { 2009 return new GetSystemMode().call(); 2010 } 2011 2012 public String updateShareLib() throws OozieClientException { 2013 return new UpdateSharelib().call(); 2014 } 2015 2016 public String listShareLib(String sharelibKey) throws OozieClientException { 2017 return new ListShareLib(sharelibKey).call(); 2018 } 2019 2020 private class GetBuildVersion extends ClientCallable<String> { 2021 2022 GetBuildVersion() { 2023 super("GET", RestConstants.ADMIN, RestConstants.ADMIN_BUILD_VERSION_RESOURCE, prepareParams()); 2024 } 2025 2026 @Override 2027 protected String call(HttpURLConnection conn) throws IOException, OozieClientException { 2028 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 2029 Reader reader = new InputStreamReader(conn.getInputStream()); 2030 JSONObject json = (JSONObject) JSONValue.parse(reader); 2031 return (String) json.get(JsonTags.BUILD_VERSION); 2032 } 2033 else { 2034 handleError(conn); 2035 } 2036 return null; 2037 } 2038 } 2039 2040 private class ValidateXML extends ClientCallable<String> { 2041 2042 String file = null; 2043 2044 ValidateXML(String file, String user) { 2045 super("POST", RestConstants.VALIDATE, "", 2046 prepareParams(RestConstants.FILE_PARAM, file, RestConstants.USER_PARAM, user)); 2047 this.file = file; 2048 } 2049 2050 @Override 2051 protected String call(HttpURLConnection conn) throws IOException, OozieClientException { 2052 conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); 2053 if (file.startsWith("/")) { 2054 FileInputStream fi = new FileInputStream(new File(file)); 2055 byte[] buffer = new byte[1024]; 2056 int n = 0; 2057 while (-1 != (n = fi.read(buffer))) { 2058 conn.getOutputStream().write(buffer, 0, n); 2059 } 2060 } 2061 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 2062 Reader reader = new InputStreamReader(conn.getInputStream()); 2063 JSONObject json = (JSONObject) JSONValue.parse(reader); 2064 return (String) json.get(JsonTags.VALIDATE); 2065 } 2066 else if ((conn.getResponseCode() == HttpURLConnection.HTTP_NOT_FOUND)) { 2067 return null; 2068 } 2069 else { 2070 handleError(conn); 2071 } 2072 return null; 2073 } 2074 } 2075 2076 2077 private class UpdateSharelib extends ClientCallable<String> { 2078 2079 UpdateSharelib() { 2080 super("GET", RestConstants.ADMIN, RestConstants.ADMIN_UPDATE_SHARELIB, prepareParams( 2081 RestConstants.ALL_SERVER_REQUEST, "true")); 2082 } 2083 2084 @Override 2085 protected String call(HttpURLConnection conn) throws IOException, OozieClientException { 2086 StringBuffer bf = new StringBuffer(); 2087 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 2088 Reader reader = new InputStreamReader(conn.getInputStream()); 2089 Object sharelib = (Object) JSONValue.parse(reader); 2090 bf.append("[ShareLib update status]").append(System.getProperty("line.separator")); 2091 if (sharelib instanceof JSONArray) { 2092 for (Object o : ((JSONArray) sharelib)) { 2093 JSONObject obj = (JSONObject) ((JSONObject) o).get(JsonTags.SHARELIB_LIB_UPDATE); 2094 for (Object key : obj.keySet()) { 2095 bf.append("\t").append(key).append(" = ").append(obj.get(key)) 2096 .append(System.getProperty("line.separator")); 2097 } 2098 bf.append(System.getProperty("line.separator")); 2099 } 2100 } 2101 else{ 2102 JSONObject obj = (JSONObject) ((JSONObject) sharelib).get(JsonTags.SHARELIB_LIB_UPDATE); 2103 for (Object key : obj.keySet()) { 2104 bf.append("\t").append(key).append(" = ").append(obj.get(key)) 2105 .append(System.getProperty("line.separator")); 2106 } 2107 bf.append(System.getProperty("line.separator")); 2108 } 2109 return bf.toString(); 2110 } 2111 else { 2112 handleError(conn); 2113 } 2114 return null; 2115 } 2116 } 2117 2118 private class ListShareLib extends ClientCallable<String> { 2119 2120 ListShareLib(String sharelibKey) { 2121 super("GET", RestConstants.ADMIN, RestConstants.ADMIN_LIST_SHARELIB, prepareParams( 2122 RestConstants.SHARE_LIB_REQUEST_KEY, sharelibKey)); 2123 } 2124 2125 @Override 2126 protected String call(HttpURLConnection conn) throws IOException, OozieClientException { 2127 2128 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 2129 StringBuffer bf = new StringBuffer(); 2130 Reader reader = new InputStreamReader(conn.getInputStream()); 2131 JSONObject json = (JSONObject) JSONValue.parse(reader); 2132 Object sharelib = json.get(JsonTags.SHARELIB_LIB); 2133 bf.append("[Available ShareLib]").append(System.getProperty("line.separator")); 2134 if (sharelib instanceof JSONArray) { 2135 for (Object o : ((JSONArray) sharelib)) { 2136 JSONObject obj = (JSONObject) o; 2137 bf.append(obj.get(JsonTags.SHARELIB_LIB_NAME)) 2138 .append(System.getProperty("line.separator")); 2139 if (obj.get(JsonTags.SHARELIB_LIB_FILES) != null) { 2140 for (Object file : ((JSONArray) obj.get(JsonTags.SHARELIB_LIB_FILES))) { 2141 bf.append("\t").append(file).append(System.getProperty("line.separator")); 2142 } 2143 } 2144 } 2145 return bf.toString(); 2146 } 2147 } 2148 else { 2149 handleError(conn); 2150 } 2151 return null; 2152 } 2153 2154 } 2155 2156 /** 2157 * Return the Oozie server build version. 2158 * 2159 * @return the Oozie server build version. 2160 * @throws OozieClientException throw if it the server build version could not be retrieved. 2161 */ 2162 public String getServerBuildVersion() throws OozieClientException { 2163 return new GetBuildVersion().call(); 2164 } 2165 2166 /** 2167 * Return the Oozie client build version. 2168 * 2169 * @return the Oozie client build version. 2170 */ 2171 public String getClientBuildVersion() { 2172 return BuildInfo.getBuildInfo().getProperty(BuildInfo.BUILD_VERSION); 2173 } 2174 2175 /** 2176 * Return the workflow application is valid. 2177 * 2178 * @param file local file or hdfs file. 2179 * @return the workflow application is valid. 2180 * @throws OozieClientException throw if it the workflow application's validation could not be retrieved. 2181 */ 2182 public String validateXML(String file) throws OozieClientException { 2183 String fileName = file; 2184 if (file.startsWith("file://")) { 2185 fileName = file.substring(7, file.length()); 2186 } 2187 if (!fileName.contains("://")) { 2188 File f = new File(fileName); 2189 if (!f.isFile()) { 2190 throw new OozieClientException("File error", "File does not exist : " + f.getAbsolutePath()); 2191 } 2192 fileName = f.getAbsolutePath(); 2193 } 2194 String user = USER_NAME_TL.get(); 2195 if (user == null) { 2196 user = System.getProperty("user.name"); 2197 } 2198 return new ValidateXML(fileName, user).call(); 2199 } 2200 2201 /** 2202 * Return the info of the coordinator jobs that match the filter. 2203 * 2204 * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax. 2205 * @param start jobs offset, base 1. 2206 * @param len number of jobs to return. 2207 * @return a list with the coordinator jobs info 2208 * @throws OozieClientException thrown if the jobs info could not be retrieved. 2209 */ 2210 public List<CoordinatorJob> getCoordJobsInfo(String filter, int start, int len) throws OozieClientException { 2211 return new CoordJobsStatus(filter, start, len).call(); 2212 } 2213 2214 /** 2215 * Return the info of the bundle jobs that match the filter. 2216 * 2217 * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax. 2218 * @param start jobs offset, base 1. 2219 * @param len number of jobs to return. 2220 * @return a list with the bundle jobs info 2221 * @throws OozieClientException thrown if the jobs info could not be retrieved. 2222 */ 2223 public List<BundleJob> getBundleJobsInfo(String filter, int start, int len) throws OozieClientException { 2224 return new BundleJobsStatus(filter, start, len).call(); 2225 } 2226 2227 public List<BulkResponse> getBulkInfo(String filter, int start, int len) throws OozieClientException { 2228 return new BulkResponseStatus(filter, start, len).call(); 2229 } 2230 2231 /** 2232 * Poll a job (Workflow Job ID, Coordinator Job ID, Coordinator Action ID, or Bundle Job ID) and return when it has reached a 2233 * terminal state. 2234 * (i.e. FAILED, KILLED, SUCCEEDED) 2235 * 2236 * @param id The Job ID 2237 * @param timeout timeout in minutes (negative values indicate no timeout) 2238 * @param interval polling interval in minutes (must be positive) 2239 * @param verbose if true, the current status will be printed out at each poll; if false, no output 2240 * @throws OozieClientException thrown if the job's status could not be retrieved 2241 */ 2242 public void pollJob(String id, int timeout, int interval, boolean verbose) throws OozieClientException { 2243 notEmpty("id", id); 2244 if (interval < 1) { 2245 throw new IllegalArgumentException("interval must be a positive integer"); 2246 } 2247 boolean noTimeout = (timeout < 1); 2248 long endTime = System.currentTimeMillis() + timeout * 60 * 1000; 2249 interval *= 60 * 1000; 2250 2251 final Set<String> completedStatuses; 2252 if (id.endsWith("-W")) { 2253 completedStatuses = COMPLETED_WF_STATUSES; 2254 } else if (id.endsWith("-C")) { 2255 completedStatuses = COMPLETED_COORD_AND_BUNDLE_STATUSES; 2256 } else if (id.endsWith("-B")) { 2257 completedStatuses = COMPLETED_COORD_AND_BUNDLE_STATUSES; 2258 } else if (id.contains("-C@")) { 2259 completedStatuses = COMPLETED_COORD_ACTION_STATUSES; 2260 } else { 2261 throw new IllegalArgumentException("invalid job type"); 2262 } 2263 2264 String status = getStatus(id); 2265 if (verbose) { 2266 System.out.println(status); 2267 } 2268 while(!completedStatuses.contains(status) && (noTimeout || System.currentTimeMillis() <= endTime)) { 2269 try { 2270 Thread.sleep(interval); 2271 } catch (InterruptedException ie) { 2272 // ignore 2273 } 2274 status = getStatus(id); 2275 if (verbose) { 2276 System.out.println(status); 2277 } 2278 } 2279 } 2280 2281 /** 2282 * Gets the status for a particular job (Workflow Job ID, Coordinator Job ID, Coordinator Action ID, or Bundle Job ID). 2283 * 2284 * @param jobId given jobId 2285 * @return the status 2286 * @throws OozieClientException 2287 */ 2288 public String getStatus(String jobId) throws OozieClientException { 2289 return new Status(jobId).call(); 2290 } 2291 2292 private class Status extends ClientCallable<String> { 2293 2294 Status(String jobId) { 2295 super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, 2296 RestConstants.JOB_SHOW_STATUS)); 2297 } 2298 2299 @Override 2300 protected String call(HttpURLConnection conn) throws IOException, OozieClientException { 2301 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 2302 Reader reader = new InputStreamReader(conn.getInputStream()); 2303 JSONObject json = (JSONObject) JSONValue.parse(reader); 2304 return (String) json.get(JsonTags.STATUS); 2305 } 2306 else { 2307 handleError(conn); 2308 } 2309 return null; 2310 } 2311 } 2312 2313 private class GetQueueDump extends ClientCallable<List<String>> { 2314 GetQueueDump() { 2315 super("GET", RestConstants.ADMIN, RestConstants.ADMIN_QUEUE_DUMP_RESOURCE, prepareParams()); 2316 } 2317 2318 @Override 2319 protected List<String> call(HttpURLConnection conn) throws IOException, OozieClientException { 2320 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 2321 Reader reader = new InputStreamReader(conn.getInputStream()); 2322 JSONObject json = (JSONObject) JSONValue.parse(reader); 2323 JSONArray queueDumpArray = (JSONArray) json.get(JsonTags.QUEUE_DUMP); 2324 2325 List<String> list = new ArrayList<String>(); 2326 list.add("[Server Queue Dump]:"); 2327 for (Object o : queueDumpArray) { 2328 JSONObject entry = (JSONObject) o; 2329 if (entry.get(JsonTags.CALLABLE_DUMP) != null) { 2330 String value = (String) entry.get(JsonTags.CALLABLE_DUMP); 2331 list.add(value); 2332 } 2333 } 2334 if (queueDumpArray.size() == 0) { 2335 list.add("Queue dump is null!"); 2336 } 2337 2338 list.add("******************************************"); 2339 list.add("[Server Uniqueness Map Dump]:"); 2340 2341 JSONArray uniqueDumpArray = (JSONArray) json.get(JsonTags.UNIQUE_MAP_DUMP); 2342 for (Object o : uniqueDumpArray) { 2343 JSONObject entry = (JSONObject) o; 2344 if (entry.get(JsonTags.UNIQUE_ENTRY_DUMP) != null) { 2345 String value = (String) entry.get(JsonTags.UNIQUE_ENTRY_DUMP); 2346 list.add(value); 2347 } 2348 } 2349 if (uniqueDumpArray.size() == 0) { 2350 list.add("Uniqueness dump is null!"); 2351 } 2352 return list; 2353 } 2354 else { 2355 handleError(conn); 2356 } 2357 return null; 2358 } 2359 } 2360 2361 /** 2362 * Return the Oozie queue's commands' dump 2363 * 2364 * @return the list of strings of callable identification in queue 2365 * @throws OozieClientException throw if it the queue dump could not be retrieved. 2366 */ 2367 public List<String> getQueueDump() throws OozieClientException { 2368 return new GetQueueDump().call(); 2369 } 2370 2371 private class GetAvailableOozieServers extends MapClientCallable { 2372 2373 GetAvailableOozieServers() { 2374 super("GET", RestConstants.ADMIN, RestConstants.ADMIN_AVAILABLE_OOZIE_SERVERS_RESOURCE, prepareParams()); 2375 } 2376 } 2377 2378 /** 2379 * Return the list of available Oozie servers. 2380 * 2381 * @return the list of available Oozie servers. 2382 * @throws OozieClientException throw if it the list of available Oozie servers could not be retrieved. 2383 */ 2384 public Map<String, String> getAvailableOozieServers() throws OozieClientException { 2385 return new GetAvailableOozieServers().call(); 2386 } 2387 2388 private class GetServerConfiguration extends MapClientCallable { 2389 2390 GetServerConfiguration() { 2391 super("GET", RestConstants.ADMIN, RestConstants.ADMIN_CONFIG_RESOURCE, prepareParams()); 2392 } 2393 } 2394 2395 /** 2396 * Return the Oozie system configuration. 2397 * 2398 * @return the Oozie system configuration. 2399 * @throws OozieClientException throw if the system configuration could not be retrieved. 2400 */ 2401 public Map<String, String> getServerConfiguration() throws OozieClientException { 2402 return new GetServerConfiguration().call(); 2403 } 2404 2405 private class GetJavaSystemProperties extends MapClientCallable { 2406 2407 GetJavaSystemProperties() { 2408 super("GET", RestConstants.ADMIN, RestConstants.ADMIN_JAVA_SYS_PROPS_RESOURCE, prepareParams()); 2409 } 2410 } 2411 2412 /** 2413 * Return the Oozie Java system properties. 2414 * 2415 * @return the Oozie Java system properties. 2416 * @throws OozieClientException throw if the system properties could not be retrieved. 2417 */ 2418 public Map<String, String> getJavaSystemProperties() throws OozieClientException { 2419 return new GetJavaSystemProperties().call(); 2420 } 2421 2422 private class GetOSEnv extends MapClientCallable { 2423 2424 GetOSEnv() { 2425 super("GET", RestConstants.ADMIN, RestConstants.ADMIN_OS_ENV_RESOURCE, prepareParams()); 2426 } 2427 } 2428 2429 /** 2430 * Return the Oozie system OS environment. 2431 * 2432 * @return the Oozie system OS environment. 2433 * @throws OozieClientException throw if the system OS environment could not be retrieved. 2434 */ 2435 public Map<String, String> getOSEnv() throws OozieClientException { 2436 return new GetOSEnv().call(); 2437 } 2438 2439 private class GetMetrics extends ClientCallable<Metrics> { 2440 2441 GetMetrics() { 2442 super("GET", RestConstants.ADMIN, RestConstants.ADMIN_METRICS_RESOURCE, prepareParams()); 2443 } 2444 2445 @Override 2446 protected Metrics call(HttpURLConnection conn) throws IOException, OozieClientException { 2447 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 2448 Reader reader = new InputStreamReader(conn.getInputStream()); 2449 JSONObject json = (JSONObject) JSONValue.parse(reader); 2450 Metrics metrics = new Metrics(json); 2451 return metrics; 2452 } 2453 else if ((conn.getResponseCode() == HttpURLConnection.HTTP_UNAVAILABLE)) { 2454 // Use Instrumentation endpoint 2455 return null; 2456 } 2457 else { 2458 handleError(conn); 2459 } 2460 return null; 2461 } 2462 } 2463 2464 public class Metrics { 2465 private Map<String, Long> counters; 2466 private Map<String, Object> gauges; 2467 private Map<String, Timer> timers; 2468 private Map<String, Histogram> histograms; 2469 2470 @SuppressWarnings("unchecked") 2471 public Metrics(JSONObject json) { 2472 JSONObject jCounters = (JSONObject) json.get("counters"); 2473 counters = new HashMap<String, Long>(jCounters.size()); 2474 for (Object entO : jCounters.entrySet()) { 2475 Entry<String, JSONObject> ent = (Entry<String, JSONObject>) entO; 2476 counters.put(ent.getKey(), (Long)ent.getValue().get("count")); 2477 } 2478 2479 JSONObject jGuages = (JSONObject) json.get("gauges"); 2480 gauges = new HashMap<String, Object>(jGuages.size()); 2481 for (Object entO : jGuages.entrySet()) { 2482 Entry<String, JSONObject> ent = (Entry<String, JSONObject>) entO; 2483 gauges.put(ent.getKey(), ent.getValue().get("value")); 2484 } 2485 2486 JSONObject jTimers = (JSONObject) json.get("timers"); 2487 timers = new HashMap<String, Timer>(jTimers.size()); 2488 for (Object entO : jTimers.entrySet()) { 2489 Entry<String, JSONObject> ent = (Entry<String, JSONObject>) entO; 2490 timers.put(ent.getKey(), new Timer(ent.getValue())); 2491 } 2492 2493 JSONObject jHistograms = (JSONObject) json.get("histograms"); 2494 histograms = new HashMap<String, Histogram>(jHistograms.size()); 2495 for (Object entO : jHistograms.entrySet()) { 2496 Entry<String, JSONObject> ent = (Entry<String, JSONObject>) entO; 2497 histograms.put(ent.getKey(), new Histogram(ent.getValue())); 2498 } 2499 } 2500 2501 public Map<String, Long> getCounters() { 2502 return counters; 2503 } 2504 2505 public Map<String, Object> getGauges() { 2506 return gauges; 2507 } 2508 2509 public Map<String, Timer> getTimers() { 2510 return timers; 2511 } 2512 2513 public Map<String, Histogram> getHistograms() { 2514 return histograms; 2515 } 2516 2517 public class Timer extends Histogram { 2518 private double m15Rate; 2519 private double m5Rate; 2520 private double m1Rate; 2521 private double meanRate; 2522 private String durationUnits; 2523 private String rateUnits; 2524 2525 public Timer(JSONObject json) { 2526 super(json); 2527 m15Rate = Double.valueOf(json.get("m15_rate").toString()); 2528 m5Rate = Double.valueOf(json.get("m5_rate").toString()); 2529 m1Rate = Double.valueOf(json.get("m1_rate").toString()); 2530 meanRate = Double.valueOf(json.get("mean_rate").toString()); 2531 durationUnits = json.get("duration_units").toString(); 2532 rateUnits = json.get("rate_units").toString(); 2533 } 2534 2535 public double get15MinuteRate() { 2536 return m15Rate; 2537 } 2538 2539 public double get5MinuteRate() { 2540 return m5Rate; 2541 } 2542 2543 public double get1MinuteRate() { 2544 return m1Rate; 2545 } 2546 2547 public double getMeanRate() { 2548 return meanRate; 2549 } 2550 2551 public String getDurationUnits() { 2552 return durationUnits; 2553 } 2554 2555 public String getRateUnits() { 2556 return rateUnits; 2557 } 2558 2559 @Override 2560 public String toString() { 2561 StringBuilder sb = new StringBuilder(super.toString()); 2562 sb.append("\n\t15 minute rate : ").append(m15Rate); 2563 sb.append("\n\t5 minute rate : ").append(m5Rate); 2564 sb.append("\n\t1 minute rate : ").append(m15Rate); 2565 sb.append("\n\tmean rate : ").append(meanRate); 2566 sb.append("\n\tduration units : ").append(durationUnits); 2567 sb.append("\n\trate units : ").append(rateUnits); 2568 return sb.toString(); 2569 } 2570 } 2571 2572 public class Histogram { 2573 private double p999; 2574 private double p99; 2575 private double p98; 2576 private double p95; 2577 private double p75; 2578 private double p50; 2579 private double mean; 2580 private double min; 2581 private double max; 2582 private double stdDev; 2583 private long count; 2584 2585 public Histogram(JSONObject json) { 2586 p999 = Double.valueOf(json.get("p999").toString()); 2587 p99 = Double.valueOf(json.get("p99").toString()); 2588 p98 = Double.valueOf(json.get("p98").toString()); 2589 p95 = Double.valueOf(json.get("p95").toString()); 2590 p75 = Double.valueOf(json.get("p75").toString()); 2591 p50 = Double.valueOf(json.get("p50").toString()); 2592 mean = Double.valueOf(json.get("mean").toString()); 2593 min = Double.valueOf(json.get("min").toString()); 2594 max = Double.valueOf(json.get("max").toString()); 2595 stdDev = Double.valueOf(json.get("stddev").toString()); 2596 count = Long.valueOf(json.get("count").toString()); 2597 } 2598 2599 public double get999thPercentile() { 2600 return p999; 2601 } 2602 2603 public double get99thPercentile() { 2604 return p99; 2605 } 2606 2607 public double get98thPercentile() { 2608 return p98; 2609 } 2610 2611 public double get95thPercentile() { 2612 return p95; 2613 } 2614 2615 public double get75thPercentile() { 2616 return p75; 2617 } 2618 2619 public double get50thPercentile() { 2620 return p50; 2621 } 2622 2623 public double getMean() { 2624 return mean; 2625 } 2626 2627 public double getMin() { 2628 return min; 2629 } 2630 2631 public double getMax() { 2632 return max; 2633 } 2634 2635 public double getStandardDeviation() { 2636 return stdDev; 2637 } 2638 2639 public long getCount() { 2640 return count; 2641 } 2642 2643 @Override 2644 public String toString() { 2645 StringBuilder sb = new StringBuilder(); 2646 sb.append("\t999th percentile : ").append(p999); 2647 sb.append("\n\t99th percentile : ").append(p99); 2648 sb.append("\n\t98th percentile : ").append(p98); 2649 sb.append("\n\t95th percentile : ").append(p95); 2650 sb.append("\n\t75th percentile : ").append(p75); 2651 sb.append("\n\t50th percentile : ").append(p50); 2652 sb.append("\n\tmean : ").append(mean); 2653 sb.append("\n\tmax : ").append(max); 2654 sb.append("\n\tmin : ").append(min); 2655 sb.append("\n\tcount : ").append(count); 2656 sb.append("\n\tstandard deviation : ").append(stdDev); 2657 return sb.toString(); 2658 } 2659 } 2660 } 2661 2662 /** 2663 * Return the Oozie metrics. If null is returned, then try {@link #getInstrumentation()}. 2664 * 2665 * @return the Oozie metrics or null. 2666 * @throws OozieClientException throw if the metrics could not be retrieved. 2667 */ 2668 public Metrics getMetrics() throws OozieClientException { 2669 return new GetMetrics().call(); 2670 } 2671 2672 private class GetInstrumentation extends ClientCallable<Instrumentation> { 2673 2674 GetInstrumentation() { 2675 super("GET", RestConstants.ADMIN, RestConstants.ADMIN_INSTRUMENTATION_RESOURCE, prepareParams()); 2676 } 2677 2678 @Override 2679 protected Instrumentation call(HttpURLConnection conn) throws IOException, OozieClientException { 2680 if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { 2681 Reader reader = new InputStreamReader(conn.getInputStream()); 2682 JSONObject json = (JSONObject) JSONValue.parse(reader); 2683 Instrumentation instrumentation = new Instrumentation(json); 2684 return instrumentation; 2685 } 2686 else if ((conn.getResponseCode() == HttpURLConnection.HTTP_UNAVAILABLE)) { 2687 // Use Metrics endpoint 2688 return null; 2689 } 2690 else { 2691 handleError(conn); 2692 } 2693 return null; 2694 } 2695 } 2696 2697 public class Instrumentation { 2698 private Map<String, Long> counters; 2699 private Map<String, Object> variables; 2700 private Map<String, Double> samplers; 2701 private Map<String, Timer> timers; 2702 2703 public Instrumentation(JSONObject json) { 2704 JSONArray jCounters = (JSONArray) json.get("counters"); 2705 counters = new HashMap<String, Long>(jCounters.size()); 2706 for (Object groupO : jCounters) { 2707 JSONObject group = (JSONObject) groupO; 2708 String groupName = group.get("group").toString() + "."; 2709 JSONArray data = (JSONArray) group.get("data"); 2710 for (Object datO : data) { 2711 JSONObject dat = (JSONObject) datO; 2712 counters.put(groupName + dat.get("name").toString(), Long.valueOf(dat.get("value").toString())); 2713 } 2714 } 2715 2716 JSONArray jVariables = (JSONArray) json.get("variables"); 2717 variables = new HashMap<String, Object>(jVariables.size()); 2718 for (Object groupO : jVariables) { 2719 JSONObject group = (JSONObject) groupO; 2720 String groupName = group.get("group").toString() + "."; 2721 JSONArray data = (JSONArray) group.get("data"); 2722 for (Object datO : data) { 2723 JSONObject dat = (JSONObject) datO; 2724 variables.put(groupName + dat.get("name").toString(), dat.get("value")); 2725 } 2726 } 2727 2728 JSONArray jSamplers = (JSONArray) json.get("samplers"); 2729 samplers = new HashMap<String, Double>(jSamplers.size()); 2730 for (Object groupO : jSamplers) { 2731 JSONObject group = (JSONObject) groupO; 2732 String groupName = group.get("group").toString() + "."; 2733 JSONArray data = (JSONArray) group.get("data"); 2734 for (Object datO : data) { 2735 JSONObject dat = (JSONObject) datO; 2736 samplers.put(groupName + dat.get("name").toString(), Double.valueOf(dat.get("value").toString())); 2737 } 2738 } 2739 2740 JSONArray jTimers = (JSONArray) json.get("timers"); 2741 timers = new HashMap<String, Timer>(jTimers.size()); 2742 for (Object groupO : jTimers) { 2743 JSONObject group = (JSONObject) groupO; 2744 String groupName = group.get("group").toString() + "."; 2745 JSONArray data = (JSONArray) group.get("data"); 2746 for (Object datO : data) { 2747 JSONObject dat = (JSONObject) datO; 2748 timers.put(groupName + dat.get("name").toString(), new Timer(dat)); 2749 } 2750 } 2751 } 2752 2753 public class Timer { 2754 private double ownTimeStdDev; 2755 private long ownTimeAvg; 2756 private long ownMaxTime; 2757 private long ownMinTime; 2758 private double totalTimeStdDev; 2759 private long totalTimeAvg; 2760 private long totalMaxTime; 2761 private long totalMinTime; 2762 private long ticks; 2763 2764 public Timer(JSONObject json) { 2765 ownTimeStdDev = Double.valueOf(json.get("ownTimeStdDev").toString()); 2766 ownTimeAvg = Long.valueOf(json.get("ownTimeAvg").toString()); 2767 ownMaxTime = Long.valueOf(json.get("ownMaxTime").toString()); 2768 ownMinTime = Long.valueOf(json.get("ownMinTime").toString()); 2769 totalTimeStdDev = Double.valueOf(json.get("totalTimeStdDev").toString()); 2770 totalTimeAvg = Long.valueOf(json.get("totalTimeAvg").toString()); 2771 totalMaxTime = Long.valueOf(json.get("totalMaxTime").toString()); 2772 totalMinTime = Long.valueOf(json.get("totalMinTime").toString()); 2773 ticks = Long.valueOf(json.get("ticks").toString()); 2774 } 2775 2776 public double getOwnTimeStandardDeviation() { 2777 return ownTimeStdDev; 2778 } 2779 2780 public long getOwnTimeAverage() { 2781 return ownTimeAvg; 2782 } 2783 2784 public long getOwnMaxTime() { 2785 return ownMaxTime; 2786 } 2787 2788 public long getOwnMinTime() { 2789 return ownMinTime; 2790 } 2791 2792 public double getTotalTimeStandardDeviation() { 2793 return totalTimeStdDev; 2794 } 2795 2796 public long getTotalTimeAverage() { 2797 return totalTimeAvg; 2798 } 2799 2800 public long getTotalMaxTime() { 2801 return totalMaxTime; 2802 } 2803 2804 public long getTotalMinTime() { 2805 return totalMinTime; 2806 } 2807 2808 public long getTicks() { 2809 return ticks; 2810 } 2811 2812 @Override 2813 public String toString() { 2814 StringBuilder sb = new StringBuilder(); 2815 sb.append("\town time standard deviation : ").append(ownTimeStdDev); 2816 sb.append("\n\town average time : ").append(ownTimeAvg); 2817 sb.append("\n\town max time : ").append(ownMaxTime); 2818 sb.append("\n\town min time : ").append(ownMinTime); 2819 sb.append("\n\ttotal time standard deviation : ").append(totalTimeStdDev); 2820 sb.append("\n\ttotal average time : ").append(totalTimeAvg); 2821 sb.append("\n\ttotal max time : ").append(totalMaxTime); 2822 sb.append("\n\ttotal min time : ").append(totalMinTime); 2823 sb.append("\n\tticks : ").append(ticks); 2824 return sb.toString(); 2825 } 2826 } 2827 2828 public Map<String, Long> getCounters() { 2829 return counters; 2830 } 2831 2832 public Map<String, Object> getVariables() { 2833 return variables; 2834 } 2835 2836 public Map<String, Double> getSamplers() { 2837 return samplers; 2838 } 2839 2840 public Map<String, Timer> getTimers() { 2841 return timers; 2842 } 2843 } 2844 2845 /** 2846 * Return the Oozie instrumentation. If null is returned, then try {@link #getMetrics()}. 2847 * 2848 * @return the Oozie intstrumentation or null. 2849 * @throws OozieClientException throw if the intstrumentation could not be retrieved. 2850 */ 2851 public Instrumentation getInstrumentation() throws OozieClientException { 2852 return new GetInstrumentation().call(); 2853 } 2854 2855 /** 2856 * Check if the string is not null or not empty. 2857 * 2858 * @param str 2859 * @param name 2860 * @return string 2861 */ 2862 public static String notEmpty(String str, String name) { 2863 if (str == null) { 2864 throw new IllegalArgumentException(name + " cannot be null"); 2865 } 2866 if (str.length() == 0) { 2867 throw new IllegalArgumentException(name + " cannot be empty"); 2868 } 2869 return str; 2870 } 2871 2872 /** 2873 * Check if the object is not null. 2874 * 2875 * @param <T> 2876 * @param obj 2877 * @param name 2878 * @return string 2879 */ 2880 public static <T> T notNull(T obj, String name) { 2881 if (obj == null) { 2882 throw new IllegalArgumentException(name + " cannot be null"); 2883 } 2884 return obj; 2885 } 2886 2887}