001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.ssh; 019 020 import java.io.BufferedReader; 021 import java.io.File; 022 import java.io.FileWriter; 023 import java.io.IOException; 024 import java.io.InputStreamReader; 025 import java.util.List; 026 import java.util.concurrent.Callable; 027 028 import org.apache.oozie.client.WorkflowAction; 029 import org.apache.oozie.client.OozieClient; 030 import org.apache.oozie.client.WorkflowAction.Status; 031 import org.apache.oozie.action.ActionExecutor; 032 import org.apache.oozie.action.ActionExecutorException; 033 import org.apache.oozie.service.CallbackService; 034 import org.apache.oozie.servlet.CallbackServlet; 035 import org.apache.oozie.service.Services; 036 import org.apache.oozie.util.IOUtils; 037 import org.apache.oozie.util.PropertiesUtils; 038 import org.apache.oozie.util.XLog; 039 import org.apache.oozie.util.XmlUtils; 040 import org.jdom.Element; 041 import org.jdom.JDOMException; 042 import org.jdom.Namespace; 043 044 /** 045 * Ssh action executor. <p/> <ul> <li>Execute the shell commands on the remote host</li> <li>Copies the base and wrapper 046 * scripts on to the remote location</li> <li>Base script is used to run the command on the remote host</li> <li>Wrapper 047 * script is used to check the status of the submitted command</li> <li>handles the submission failures</li> </ul> 048 */ 049 public class SshActionExecutor extends ActionExecutor { 050 public static final String ACTION_TYPE = "ssh"; 051 052 /** 053 * Configuration parameter which specifies whether the specified ssh user is allowed, or has to be the job user. 054 */ 055 public static final String CONF_SSH_ALLOW_USER_AT_HOST = CONF_PREFIX + "ssh.allow.user.at.host"; 056 057 protected static final String SSH_COMMAND_OPTIONS = 058 "-o PasswordAuthentication=no -o KbdInteractiveDevices=no -o StrictHostKeyChecking=no -o ConnectTimeout=20 "; 059 060 protected static final String SSH_COMMAND_BASE = "ssh " + SSH_COMMAND_OPTIONS; 061 protected static final String SCP_COMMAND_BASE = "scp " + SSH_COMMAND_OPTIONS; 062 063 public static final String ERR_SETUP_FAILED = "SETUP_FAILED"; 064 public static final String ERR_EXECUTION_FAILED = "EXECUTION_FAILED"; 065 public static final String ERR_UNKNOWN_ERROR = "UNKOWN_ERROR"; 066 public static final String ERR_COULD_NOT_CONNECT = "COULD_NOT_CONNECT"; 067 public static final String ERR_HOST_RESOLUTION = "COULD_NOT_RESOLVE_HOST"; 068 public static final String ERR_FNF = "FNF"; 069 public static final String ERR_AUTH_FAILED = "AUTH_FAILED"; 070 public static final String ERR_NO_EXEC_PERM = "NO_EXEC_PERM"; 071 public static final String ERR_USER_MISMATCH = "ERR_USER_MISMATCH"; 072 public static final String ERR_EXCEDE_LEN = "ERR_OUTPUT_EXCEED_MAX_LEN"; 073 074 public static final String DELETE_TMP_DIR = "oozie.action.ssh.delete.remote.tmp.dir"; 075 076 public static final String HTTP_COMMAND = "oozie.action.ssh.http.command"; 077 078 public static final String HTTP_COMMAND_OPTIONS = "oozie.action.ssh.http.command.post.options"; 079 080 private static final String EXT_STATUS_VAR = "#status"; 081 082 private static int maxLen; 083 private static boolean allowSshUserAtHost; 084 085 protected SshActionExecutor() { 086 super(ACTION_TYPE); 087 } 088 089 /** 090 * Initialize Action. 091 */ 092 @Override 093 public void initActionType() { 094 super.initActionType(); 095 maxLen = getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN, 2 * 1024); 096 allowSshUserAtHost = getOozieConf().getBoolean(CONF_SSH_ALLOW_USER_AT_HOST, true); 097 registerError(InterruptedException.class.getName(), ActionExecutorException.ErrorType.ERROR, "SH001"); 098 registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "SH002"); 099 initSshScripts(); 100 } 101 102 /** 103 * Check ssh action status. 104 * 105 * @param context action execution context. 106 * @param action action object. 107 */ 108 @Override 109 public void check(Context context, WorkflowAction action) throws ActionExecutorException { 110 Status status = getActionStatus(context, action); 111 boolean captureOutput = false; 112 try { 113 Element eConf = XmlUtils.parseXml(action.getConf()); 114 Namespace ns = eConf.getNamespace(); 115 captureOutput = eConf.getChild("capture-output", ns) != null; 116 } 117 catch (JDOMException ex) { 118 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_XML_PARSE_FAILED", 119 "unknown error", ex); 120 } 121 XLog log = XLog.getLog(getClass()); 122 log.debug("Capture Output: {0}", captureOutput); 123 if (status == Status.OK) { 124 if (captureOutput) { 125 String outFile = getRemoteFileName(context, action, "stdout", false, true); 126 String dataCommand = SSH_COMMAND_BASE + action.getTrackerUri() + " cat " + outFile; 127 log.debug("Ssh command [{0}]", dataCommand); 128 try { 129 Process process = Runtime.getRuntime().exec(dataCommand.split("\\s")); 130 StringBuffer buffer = new StringBuffer(); 131 boolean overflow = false; 132 drainBuffers(process, buffer, null, maxLen); 133 if (buffer.length() > maxLen) { 134 overflow = true; 135 } 136 if (overflow) { 137 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, 138 "ERR_OUTPUT_EXCEED_MAX_LEN", "unknown error"); 139 } 140 context.setExecutionData(status.toString(), PropertiesUtils.stringToProperties(buffer.toString())); 141 } 142 catch (Exception ex) { 143 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_UNKNOWN_ERROR", 144 "unknown error", ex); 145 } 146 } 147 else { 148 context.setExecutionData(status.toString(), null); 149 } 150 } 151 else { 152 if (status == Status.ERROR) { 153 context.setExecutionData(status.toString(), null); 154 } 155 else { 156 context.setExternalStatus(status.toString()); 157 } 158 } 159 } 160 161 /** 162 * Kill ssh action. 163 * 164 * @param context action execution context. 165 * @param action object. 166 */ 167 @Override 168 public void kill(Context context, WorkflowAction action) throws ActionExecutorException { 169 String command = "ssh " + action.getTrackerUri() + " kill -KILL " + action.getExternalId(); 170 int returnValue = getReturnValue(command); 171 if (returnValue != 0) { 172 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_TO_KILL", XLog.format( 173 "Unable to kill process {0} on {1}", action.getExternalId(), action.getTrackerUri())); 174 } 175 context.setEndData(WorkflowAction.Status.KILLED, "ERROR"); 176 } 177 178 /** 179 * Start the ssh action execution. 180 * 181 * @param context action execution context. 182 * @param action action object. 183 */ 184 @SuppressWarnings("unchecked") 185 @Override 186 public void start(final Context context, final WorkflowAction action) throws ActionExecutorException { 187 XLog log = XLog.getLog(getClass()); 188 log.info("start() begins"); 189 String confStr = action.getConf(); 190 Element conf; 191 try { 192 conf = XmlUtils.parseXml(confStr); 193 } 194 catch (Exception ex) { 195 throw convertException(ex); 196 } 197 Namespace nameSpace = conf.getNamespace(); 198 Element hostElement = conf.getChild("host", nameSpace); 199 String hostString = hostElement.getValue().trim(); 200 hostString = prepareUserHost(hostString, context); 201 final String host = hostString; 202 final String dirLocation = execute(new Callable<String>() { 203 public String call() throws Exception { 204 return setupRemote(host, context, action); 205 } 206 207 }); 208 209 String runningPid = execute(new Callable<String>() { 210 public String call() throws Exception { 211 return checkIfRunning(host, context, action); 212 } 213 }); 214 String pid = ""; 215 216 if (runningPid == null) { 217 final Element commandElement = conf.getChild("command", nameSpace); 218 final boolean ignoreOutput = conf.getChild("capture-output", nameSpace) == null; 219 220 if (commandElement != null) { 221 List<Element> argsList = conf.getChildren("args", nameSpace); 222 StringBuilder args = new StringBuilder(""); 223 if ((argsList != null) && (argsList.size() > 0)) { 224 for (Element argsElement : argsList) { 225 args = args.append(argsElement.getValue()).append(" "); 226 } 227 args.setLength(args.length() - 1); 228 } 229 final String argsString = args.toString(); 230 final String recoveryId = context.getRecoveryId(); 231 pid = execute(new Callable<String>() { 232 233 @Override 234 public String call() throws Exception { 235 return doExecute(host, dirLocation, commandElement.getValue(), argsString, ignoreOutput, 236 action, recoveryId); 237 } 238 239 }); 240 } 241 context.setStartData(pid, host, host); 242 } 243 else { 244 pid = runningPid; 245 context.setStartData(pid, host, host); 246 check(context, action); 247 } 248 log.info("start() ends"); 249 } 250 251 private String checkIfRunning(String host, final Context context, final WorkflowAction action) { 252 String pid = null; 253 String outFile = getRemoteFileName(context, action, "pid", false, false); 254 String getOutputCmd = SSH_COMMAND_BASE + host + " cat " + outFile; 255 try { 256 Process process = Runtime.getRuntime().exec(getOutputCmd.split("\\s")); 257 StringBuffer buffer = new StringBuffer(); 258 drainBuffers(process, buffer, null, maxLen); 259 pid = getFirstLine(buffer); 260 261 if (Long.valueOf(pid) > 0) { 262 return pid; 263 } 264 else { 265 return null; 266 } 267 } 268 catch (Exception e) { 269 return null; 270 } 271 } 272 273 /** 274 * Get remote host working location. 275 * 276 * @param context action execution context 277 * @param action Action 278 * @param fileExtension Extension to be added to file name 279 * @param dirOnly Get the Directory only 280 * @param useExtId Flag to use external ID in the path 281 * @return remote host file name/Directory. 282 */ 283 public String getRemoteFileName(Context context, WorkflowAction action, String fileExtension, boolean dirOnly, 284 boolean useExtId) { 285 String path = getActionDirPath(context.getWorkflow().getId(), action, ACTION_TYPE, false) + "/"; 286 if (dirOnly) { 287 return path; 288 } 289 if (useExtId) { 290 path = path + action.getExternalId() + "."; 291 } 292 path = path + context.getRecoveryId() + "." + fileExtension; 293 return path; 294 } 295 296 /** 297 * Utility method to execute command. 298 * 299 * @param command Command to execute as String. 300 * @return exit status of the execution. 301 * @throws IOException if process exits with status nonzero. 302 * @throws InterruptedException if process does not run properly. 303 */ 304 public int executeCommand(String command) throws IOException, InterruptedException { 305 Runtime runtime = Runtime.getRuntime(); 306 Process p = runtime.exec(command.split("\\s")); 307 308 StringBuffer errorBuffer = new StringBuffer(); 309 int exitValue = drainBuffers(p, null, errorBuffer, maxLen); 310 311 String error = null; 312 if (exitValue != 0) { 313 error = getTruncatedString(errorBuffer); 314 throw new IOException(XLog.format("Not able to perform operation [{0}]", command) + " | " + "ErrorStream: " 315 + error); 316 } 317 return exitValue; 318 } 319 320 /** 321 * Do ssh action execution setup on remote host. 322 * 323 * @param host host name. 324 * @param context action execution context. 325 * @param action action object. 326 * @return remote host working directory. 327 * @throws IOException thrown if failed to setup. 328 * @throws InterruptedException thrown if any interruption happens. 329 */ 330 protected String setupRemote(String host, Context context, WorkflowAction action) throws IOException, InterruptedException { 331 XLog log = XLog.getLog(getClass()); 332 log.info("Attempting to copy ssh base scripts to remote host [{0}]", host); 333 String localDirLocation = Services.get().getRuntimeDir() + "/ssh"; 334 if (localDirLocation.endsWith("/")) { 335 localDirLocation = localDirLocation.substring(0, localDirLocation.length() - 1); 336 } 337 File file = new File(localDirLocation + "/ssh-base.sh"); 338 if (!file.exists()) { 339 throw new IOException("Required Local file " + file.getAbsolutePath() + " not present."); 340 } 341 file = new File(localDirLocation + "/ssh-wrapper.sh"); 342 if (!file.exists()) { 343 throw new IOException("Required Local file " + file.getAbsolutePath() + " not present."); 344 } 345 String remoteDirLocation = getRemoteFileName(context, action, null, true, true); 346 String command = XLog.format("{0}{1} mkdir -p {2} ", SSH_COMMAND_BASE, host, remoteDirLocation).toString(); 347 executeCommand(command); 348 command = XLog.format("{0}{1}/ssh-base.sh {2}/ssh-wrapper.sh {3}:{4}", SCP_COMMAND_BASE, localDirLocation, 349 localDirLocation, host, remoteDirLocation); 350 executeCommand(command); 351 command = XLog.format("{0}{1} chmod +x {2}ssh-base.sh {3}ssh-wrapper.sh ", SSH_COMMAND_BASE, host, 352 remoteDirLocation, remoteDirLocation); 353 executeCommand(command); 354 return remoteDirLocation; 355 } 356 357 /** 358 * Execute the ssh command. 359 * 360 * @param host hostname. 361 * @param dirLocation location of the base and wrapper scripts. 362 * @param cmnd command to be executed. 363 * @param args command arguments. 364 * @param ignoreOutput ignore output option. 365 * @param action action object. 366 * @param recoveryId action id + run number to enable recovery in rerun 367 * @return process id of the running command. 368 * @throws IOException thrown if failed to run the command. 369 * @throws InterruptedException thrown if any interruption happens. 370 */ 371 protected String doExecute(String host, String dirLocation, String cmnd, String args, boolean ignoreOutput, 372 WorkflowAction action, String recoveryId) throws IOException, InterruptedException { 373 XLog log = XLog.getLog(getClass()); 374 Runtime runtime = Runtime.getRuntime(); 375 String callbackPost = ignoreOutput ? "_" : getOozieConf().get(HTTP_COMMAND_OPTIONS).replace(" ", "%%%"); 376 // TODO check 377 String callBackUrl = Services.get().get(CallbackService.class) 378 .createCallBackUrl(action.getId(), EXT_STATUS_VAR); 379 String command = XLog.format("{0}{1} {2}ssh-base.sh {3} \"{4}\" \"{5}\" {6} {7} {8} ", SSH_COMMAND_BASE, host, 380 dirLocation, getOozieConf().get(HTTP_COMMAND), callBackUrl, callbackPost, recoveryId, cmnd, args) 381 .toString(); 382 log.trace("Executing ssh command [{0}]", command); 383 Process p = runtime.exec(command.split("\\s")); 384 String pid = ""; 385 386 StringBuffer inputBuffer = new StringBuffer(); 387 StringBuffer errorBuffer = new StringBuffer(); 388 int exitValue = drainBuffers(p, inputBuffer, errorBuffer, maxLen); 389 390 pid = getFirstLine(inputBuffer); 391 392 String error = null; 393 if (exitValue != 0) { 394 error = getTruncatedString(errorBuffer); 395 throw new IOException(XLog.format("Not able to execute ssh-base.sh on {0}", host) + " | " + "ErrorStream: " 396 + error); 397 } 398 return pid; 399 } 400 401 /** 402 * End action execution. 403 * 404 * @param context action execution context. 405 * @param action action object. 406 * @throws ActionExecutorException thrown if action end execution fails. 407 */ 408 public void end(final Context context, final WorkflowAction action) throws ActionExecutorException { 409 if (action.getExternalStatus().equals("OK")) { 410 context.setEndData(WorkflowAction.Status.OK, WorkflowAction.Status.OK.toString()); 411 } 412 else { 413 context.setEndData(WorkflowAction.Status.ERROR, WorkflowAction.Status.ERROR.toString()); 414 } 415 boolean deleteTmpDir = getOozieConf().getBoolean(DELETE_TMP_DIR, true); 416 if (deleteTmpDir) { 417 String tmpDir = getRemoteFileName(context, action, null, true, false); 418 String removeTmpDirCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " rm -rf " + tmpDir; 419 int retVal = getReturnValue(removeTmpDirCmd); 420 if (retVal != 0) { 421 XLog.getLog(getClass()).warn("Cannot delete temp dir {0}", tmpDir); 422 } 423 } 424 } 425 426 /** 427 * Get the return value of a process. 428 * 429 * @param command command to be executed. 430 * @return zero if execution is successful and any non zero value for failure. 431 * @throws ActionExecutorException 432 */ 433 private int getReturnValue(String command) throws ActionExecutorException { 434 int returnValue; 435 Process ps = null; 436 try { 437 ps = Runtime.getRuntime().exec(command.split("\\s")); 438 returnValue = drainBuffers(ps, null, null, 0); 439 } 440 catch (IOException e) { 441 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_OPERATION", XLog.format( 442 "Not able to perform operation {0}", command), e); 443 } 444 finally { 445 ps.destroy(); 446 } 447 return returnValue; 448 } 449 450 /** 451 * Copy the ssh base and wrapper scripts to the local directory. 452 */ 453 private void initSshScripts() { 454 String dirLocation = Services.get().getRuntimeDir() + "/ssh"; 455 File path = new File(dirLocation); 456 if (!path.mkdirs()) { 457 throw new RuntimeException(XLog.format("Not able to create required directory {0}", dirLocation)); 458 } 459 try { 460 IOUtils.copyCharStream(IOUtils.getResourceAsReader("ssh-base.sh", -1), new FileWriter(dirLocation 461 + "/ssh-base.sh")); 462 IOUtils.copyCharStream(IOUtils.getResourceAsReader("ssh-wrapper.sh", -1), new FileWriter(dirLocation 463 + "/ssh-wrapper.sh")); 464 } 465 catch (IOException ie) { 466 throw new RuntimeException(XLog.format("Not able to copy required scripts file to {0} " 467 + "for SshActionHandler", dirLocation)); 468 } 469 } 470 471 /** 472 * Get action status. 473 * 474 * @param action action object. 475 * @return status of the action(RUNNING/OK/ERROR). 476 * @throws ActionExecutorException thrown if there is any error in getting status. 477 */ 478 protected Status getActionStatus(Context context, WorkflowAction action) throws ActionExecutorException { 479 String command = SSH_COMMAND_BASE + action.getTrackerUri() + " ps -p " + action.getExternalId(); 480 Status aStatus; 481 int returnValue = getReturnValue(command); 482 if (returnValue == 0) { 483 aStatus = Status.RUNNING; 484 } 485 else { 486 String outFile = getRemoteFileName(context, action, "error", false, true); 487 String checkErrorCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " ls " + outFile; 488 int retVal = getReturnValue(checkErrorCmd); 489 if (retVal == 0) { 490 aStatus = Status.ERROR; 491 } 492 else { 493 aStatus = Status.OK; 494 } 495 } 496 return aStatus; 497 } 498 499 /** 500 * Execute the callable. 501 * 502 * @param callable required callable. 503 * @throws ActionExecutorException thrown if there is any error in command execution. 504 */ 505 private <T> T execute(Callable<T> callable) throws ActionExecutorException { 506 XLog log = XLog.getLog(getClass()); 507 try { 508 return callable.call(); 509 } 510 catch (IOException ex) { 511 log.warn("Error while executing ssh EXECUTION"); 512 String errorMessage = ex.getMessage(); 513 if (null == errorMessage) { // Unknown IOException 514 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_UNKNOWN_ERROR, ex 515 .getMessage(), ex); 516 } // Host Resolution Issues 517 else { 518 if (errorMessage.contains("Could not resolve hostname") || 519 errorMessage.contains("service not known")) { 520 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_HOST_RESOLUTION, ex 521 .getMessage(), ex); 522 } // Connection Timeout. Host temporarily down. 523 else { 524 if (errorMessage.contains("timed out")) { 525 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_COULD_NOT_CONNECT, 526 ex.getMessage(), ex); 527 }// Local ssh-base or ssh-wrapper missing 528 else { 529 if (errorMessage.contains("Required Local file")) { 530 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_FNF, 531 ex.getMessage(), ex); // local_FNF 532 }// Required oozie bash scripts missing, after the copy was 533 // successful 534 else { 535 if (errorMessage.contains("No such file or directory") 536 && (errorMessage.contains("ssh-base") || errorMessage.contains("ssh-wrapper"))) { 537 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_FNF, 538 ex.getMessage(), ex); // remote 539 // FNF 540 } // Required application execution binary missing (either 541 // caught by ssh-wrapper 542 else { 543 if (errorMessage.contains("command not found")) { 544 throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, ERR_FNF, ex 545 .getMessage(), ex); // remote 546 // FNF 547 } // Permission denied while connecting 548 else { 549 if (errorMessage.contains("Permission denied")) { 550 throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, ERR_AUTH_FAILED, ex 551 .getMessage(), ex); 552 } // Permission denied while executing 553 else { 554 if (errorMessage.contains(": Permission denied")) { 555 throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, ERR_NO_EXEC_PERM, ex 556 .getMessage(), ex); 557 } 558 else { 559 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_UNKNOWN_ERROR, ex 560 .getMessage(), ex); 561 } 562 } 563 } 564 } 565 } 566 } 567 } 568 } 569 } // Any other type of exception 570 catch (Exception ex) { 571 throw convertException(ex); 572 } 573 } 574 575 /** 576 * Checks whether the system is configured to always use the oozie user for ssh, and injects the user if required. 577 * 578 * @param host the host string. 579 * @param context the execution context. 580 * @return the modified host string with a user parameter added on if required. 581 * @throws ActionExecutorException in case the flag to use the oozie user is turned on and there is a mismatch 582 * between the user specified in the host and the oozie user. 583 */ 584 private String prepareUserHost(String host, Context context) throws ActionExecutorException { 585 String oozieUser = context.getProtoActionConf().get(OozieClient.USER_NAME); 586 if (allowSshUserAtHost) { 587 if (!host.contains("@")) { 588 host = oozieUser + "@" + host; 589 } 590 } 591 else { 592 if (host.contains("@")) { 593 if (!host.toLowerCase().startsWith(oozieUser + "@")) { 594 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_USER_MISMATCH, 595 XLog.format("user mismatch between oozie user [{0}] and ssh host [{1}]", oozieUser, host)); 596 } 597 } 598 else { 599 host = oozieUser + "@" + host; 600 } 601 } 602 return host; 603 } 604 605 public boolean isCompleted(String externalStatus) { 606 return true; 607 } 608 609 /** 610 * Truncate the string to max length. 611 * 612 * @param strBuffer 613 * @return truncated string string 614 */ 615 private String getTruncatedString(StringBuffer strBuffer) { 616 617 if (strBuffer.length() <= maxLen) { 618 return strBuffer.toString(); 619 } 620 else { 621 return strBuffer.substring(0, maxLen); 622 } 623 } 624 625 /** 626 * Drains the inputStream and errorStream of the Process being executed. The contents of the streams are stored if a 627 * buffer is provided for the stream. 628 * 629 * @param p The Process instance. 630 * @param inputBuffer The buffer into which STDOUT is to be read. Can be null if only draining is required. 631 * @param errorBuffer The buffer into which STDERR is to be read. Can be null if only draining is required. 632 * @param maxLength The maximum data length to be stored in these buffers. This is an indicative value, and the 633 * store content may exceed this length. 634 * @return the exit value of the process. 635 * @throws IOException 636 */ 637 private int drainBuffers(Process p, StringBuffer inputBuffer, StringBuffer errorBuffer, int maxLength) 638 throws IOException { 639 int exitValue = -1; 640 BufferedReader ir = new BufferedReader(new InputStreamReader(p.getInputStream())); 641 BufferedReader er = new BufferedReader(new InputStreamReader(p.getErrorStream())); 642 643 int inBytesRead = 0; 644 int errBytesRead = 0; 645 646 boolean processEnded = false; 647 648 try { 649 while (!processEnded) { 650 try { 651 exitValue = p.exitValue(); 652 processEnded = true; 653 } 654 catch (IllegalThreadStateException ex) { 655 // Continue to drain. 656 } 657 658 inBytesRead += drainBuffer(ir, inputBuffer, maxLength, inBytesRead, processEnded); 659 errBytesRead += drainBuffer(er, errorBuffer, maxLength, errBytesRead, processEnded); 660 } 661 } 662 finally { 663 ir.close(); 664 er.close(); 665 } 666 return exitValue; 667 } 668 669 /** 670 * Reads the contents of a stream and stores them into the provided buffer. 671 * 672 * @param br The stream to be read. 673 * @param storageBuf The buffer into which the contents of the stream are to be stored. 674 * @param maxLength The maximum number of bytes to be stored in the buffer. An indicative value and may be 675 * exceeded. 676 * @param bytesRead The number of bytes read from this stream to date. 677 * @param readAll If true, the stream is drained while their is data available in it. Otherwise, only a single chunk 678 * of data is read, irrespective of how much is available. 679 * @return 680 * @throws IOException 681 */ 682 private int drainBuffer(BufferedReader br, StringBuffer storageBuf, int maxLength, int bytesRead, boolean readAll) 683 throws IOException { 684 int bReadSession = 0; 685 if (br.ready()) { 686 char[] buf = new char[1024]; 687 do { 688 int bReadCurrent = br.read(buf, 0, 1024); 689 if (storageBuf != null && bytesRead < maxLength) { 690 storageBuf.append(buf, 0, bReadCurrent); 691 } 692 bReadSession += bReadCurrent; 693 } while (br.ready() && readAll); 694 } 695 return bReadSession; 696 } 697 698 /** 699 * Returns the first line from a StringBuffer, recognized by the new line character \n. 700 * 701 * @param buffer The StringBuffer from which the first line is required. 702 * @return The first line of the buffer. 703 */ 704 private String getFirstLine(StringBuffer buffer) { 705 int newLineIndex = 0; 706 newLineIndex = buffer.indexOf("\n"); 707 if (newLineIndex == -1) { 708 return buffer.toString(); 709 } 710 else { 711 return buffer.substring(0, newLineIndex); 712 } 713 } 714 }