This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.ssh;
019
020 import java.io.BufferedReader;
021 import java.io.File;
022 import java.io.FileWriter;
023 import java.io.IOException;
024 import java.io.InputStreamReader;
025 import java.util.List;
026 import java.util.concurrent.Callable;
027
028 import org.apache.oozie.client.WorkflowAction;
029 import org.apache.oozie.client.OozieClient;
030 import org.apache.oozie.client.WorkflowAction.Status;
031 import org.apache.oozie.action.ActionExecutor;
032 import org.apache.oozie.action.ActionExecutorException;
033 import org.apache.oozie.service.CallbackService;
034 import org.apache.oozie.servlet.CallbackServlet;
035 import org.apache.oozie.service.Services;
036 import org.apache.oozie.util.IOUtils;
037 import org.apache.oozie.util.PropertiesUtils;
038 import org.apache.oozie.util.XLog;
039 import org.apache.oozie.util.XmlUtils;
040 import org.jdom.Element;
041 import org.jdom.JDOMException;
042 import org.jdom.Namespace;
043
044 /**
045 * Ssh action executor. <p/> <ul> <li>Execute the shell commands on the remote host</li> <li>Copies the base and wrapper
046 * scripts on to the remote location</li> <li>Base script is used to run the command on the remote host</li> <li>Wrapper
047 * script is used to check the status of the submitted command</li> <li>handles the submission failures</li> </ul>
048 */
049 public class SshActionExecutor extends ActionExecutor {
050 public static final String ACTION_TYPE = "ssh";
051
052 /**
053 * Configuration parameter which specifies whether the specified ssh user is allowed, or has to be the job user.
054 */
055 public static final String CONF_SSH_ALLOW_USER_AT_HOST = CONF_PREFIX + "ssh.allow.user.at.host";
056
057 protected static final String SSH_COMMAND_OPTIONS =
058 "-o PasswordAuthentication=no -o KbdInteractiveDevices=no -o StrictHostKeyChecking=no -o ConnectTimeout=20 ";
059
060 protected static final String SSH_COMMAND_BASE = "ssh " + SSH_COMMAND_OPTIONS;
061 protected static final String SCP_COMMAND_BASE = "scp " + SSH_COMMAND_OPTIONS;
062
063 public static final String ERR_SETUP_FAILED = "SETUP_FAILED";
064 public static final String ERR_EXECUTION_FAILED = "EXECUTION_FAILED";
065 public static final String ERR_UNKNOWN_ERROR = "UNKOWN_ERROR";
066 public static final String ERR_COULD_NOT_CONNECT = "COULD_NOT_CONNECT";
067 public static final String ERR_HOST_RESOLUTION = "COULD_NOT_RESOLVE_HOST";
068 public static final String ERR_FNF = "FNF";
069 public static final String ERR_AUTH_FAILED = "AUTH_FAILED";
070 public static final String ERR_NO_EXEC_PERM = "NO_EXEC_PERM";
071 public static final String ERR_USER_MISMATCH = "ERR_USER_MISMATCH";
072 public static final String ERR_EXCEDE_LEN = "ERR_OUTPUT_EXCEED_MAX_LEN";
073
074 public static final String DELETE_TMP_DIR = "oozie.action.ssh.delete.remote.tmp.dir";
075
076 public static final String HTTP_COMMAND = "oozie.action.ssh.http.command";
077
078 public static final String HTTP_COMMAND_OPTIONS = "oozie.action.ssh.http.command.post.options";
079
080 private static final String EXT_STATUS_VAR = "#status";
081
082 private static int maxLen;
083 private static boolean allowSshUserAtHost;
084
085 protected SshActionExecutor() {
086 super(ACTION_TYPE);
087 }
088
089 /**
090 * Initialize Action.
091 */
092 @Override
093 public void initActionType() {
094 super.initActionType();
095 maxLen = getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN, 2 * 1024);
096 allowSshUserAtHost = getOozieConf().getBoolean(CONF_SSH_ALLOW_USER_AT_HOST, true);
097 registerError(InterruptedException.class.getName(), ActionExecutorException.ErrorType.ERROR, "SH001");
098 registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "SH002");
099 initSshScripts();
100 }
101
102 /**
103 * Check ssh action status.
104 *
105 * @param context action execution context.
106 * @param action action object.
107 */
108 @Override
109 public void check(Context context, WorkflowAction action) throws ActionExecutorException {
110 Status status = getActionStatus(context, action);
111 boolean captureOutput = false;
112 try {
113 Element eConf = XmlUtils.parseXml(action.getConf());
114 Namespace ns = eConf.getNamespace();
115 captureOutput = eConf.getChild("capture-output", ns) != null;
116 }
117 catch (JDOMException ex) {
118 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_XML_PARSE_FAILED",
119 "unknown error", ex);
120 }
121 XLog log = XLog.getLog(getClass());
122 log.debug("Capture Output: {0}", captureOutput);
123 if (status == Status.OK) {
124 if (captureOutput) {
125 String outFile = getRemoteFileName(context, action, "stdout", false, true);
126 String dataCommand = SSH_COMMAND_BASE + action.getTrackerUri() + " cat " + outFile;
127 log.debug("Ssh command [{0}]", dataCommand);
128 try {
129 Process process = Runtime.getRuntime().exec(dataCommand.split("\\s"));
130 StringBuffer buffer = new StringBuffer();
131 boolean overflow = false;
132 drainBuffers(process, buffer, null, maxLen);
133 if (buffer.length() > maxLen) {
134 overflow = true;
135 }
136 if (overflow) {
137 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR,
138 "ERR_OUTPUT_EXCEED_MAX_LEN", "unknown error");
139 }
140 context.setExecutionData(status.toString(), PropertiesUtils.stringToProperties(buffer.toString()));
141 }
142 catch (Exception ex) {
143 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_UNKNOWN_ERROR",
144 "unknown error", ex);
145 }
146 }
147 else {
148 context.setExecutionData(status.toString(), null);
149 }
150 }
151 else {
152 if (status == Status.ERROR) {
153 context.setExecutionData(status.toString(), null);
154 }
155 else {
156 context.setExternalStatus(status.toString());
157 }
158 }
159 }
160
161 /**
162 * Kill ssh action.
163 *
164 * @param context action execution context.
165 * @param action object.
166 */
167 @Override
168 public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
169 String command = "ssh " + action.getTrackerUri() + " kill -KILL " + action.getExternalId();
170 int returnValue = getReturnValue(command);
171 if (returnValue != 0) {
172 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_TO_KILL", XLog.format(
173 "Unable to kill process {0} on {1}", action.getExternalId(), action.getTrackerUri()));
174 }
175 context.setEndData(WorkflowAction.Status.KILLED, "ERROR");
176 }
177
178 /**
179 * Start the ssh action execution.
180 *
181 * @param context action execution context.
182 * @param action action object.
183 */
184 @SuppressWarnings("unchecked")
185 @Override
186 public void start(final Context context, final WorkflowAction action) throws ActionExecutorException {
187 XLog log = XLog.getLog(getClass());
188 log.info("start() begins");
189 String confStr = action.getConf();
190 Element conf;
191 try {
192 conf = XmlUtils.parseXml(confStr);
193 }
194 catch (Exception ex) {
195 throw convertException(ex);
196 }
197 Namespace nameSpace = conf.getNamespace();
198 Element hostElement = conf.getChild("host", nameSpace);
199 String hostString = hostElement.getValue().trim();
200 hostString = prepareUserHost(hostString, context);
201 final String host = hostString;
202 final String dirLocation = execute(new Callable<String>() {
203 public String call() throws Exception {
204 return setupRemote(host, context, action);
205 }
206
207 });
208
209 String runningPid = execute(new Callable<String>() {
210 public String call() throws Exception {
211 return checkIfRunning(host, context, action);
212 }
213 });
214 String pid = "";
215
216 if (runningPid == null) {
217 final Element commandElement = conf.getChild("command", nameSpace);
218 final boolean ignoreOutput = conf.getChild("capture-output", nameSpace) == null;
219
220 if (commandElement != null) {
221 List<Element> argsList = conf.getChildren("args", nameSpace);
222 StringBuilder args = new StringBuilder("");
223 if ((argsList != null) && (argsList.size() > 0)) {
224 for (Element argsElement : argsList) {
225 args = args.append(argsElement.getValue()).append(" ");
226 }
227 args.setLength(args.length() - 1);
228 }
229 final String argsString = args.toString();
230 final String recoveryId = context.getRecoveryId();
231 pid = execute(new Callable<String>() {
232
233 @Override
234 public String call() throws Exception {
235 return doExecute(host, dirLocation, commandElement.getValue(), argsString, ignoreOutput,
236 action, recoveryId);
237 }
238
239 });
240 }
241 context.setStartData(pid, host, host);
242 }
243 else {
244 pid = runningPid;
245 context.setStartData(pid, host, host);
246 check(context, action);
247 }
248 log.info("start() ends");
249 }
250
251 private String checkIfRunning(String host, final Context context, final WorkflowAction action) {
252 String pid = null;
253 String outFile = getRemoteFileName(context, action, "pid", false, false);
254 String getOutputCmd = SSH_COMMAND_BASE + host + " cat " + outFile;
255 try {
256 Process process = Runtime.getRuntime().exec(getOutputCmd.split("\\s"));
257 StringBuffer buffer = new StringBuffer();
258 drainBuffers(process, buffer, null, maxLen);
259 pid = getFirstLine(buffer);
260
261 if (Long.valueOf(pid) > 0) {
262 return pid;
263 }
264 else {
265 return null;
266 }
267 }
268 catch (Exception e) {
269 return null;
270 }
271 }
272
273 /**
274 * Get remote host working location.
275 *
276 * @param context action execution context
277 * @param action Action
278 * @param fileExtension Extension to be added to file name
279 * @param dirOnly Get the Directory only
280 * @param useExtId Flag to use external ID in the path
281 * @return remote host file name/Directory.
282 */
283 public String getRemoteFileName(Context context, WorkflowAction action, String fileExtension, boolean dirOnly,
284 boolean useExtId) {
285 String path = getActionDirPath(context.getWorkflow().getId(), action, ACTION_TYPE, false) + "/";
286 if (dirOnly) {
287 return path;
288 }
289 if (useExtId) {
290 path = path + action.getExternalId() + ".";
291 }
292 path = path + context.getRecoveryId() + "." + fileExtension;
293 return path;
294 }
295
296 /**
297 * Utility method to execute command.
298 *
299 * @param command Command to execute as String.
300 * @return exit status of the execution.
301 * @throws IOException if process exits with status nonzero.
302 * @throws InterruptedException if process does not run properly.
303 */
304 public int executeCommand(String command) throws IOException, InterruptedException {
305 Runtime runtime = Runtime.getRuntime();
306 Process p = runtime.exec(command.split("\\s"));
307
308 StringBuffer errorBuffer = new StringBuffer();
309 int exitValue = drainBuffers(p, null, errorBuffer, maxLen);
310
311 String error = null;
312 if (exitValue != 0) {
313 error = getTruncatedString(errorBuffer);
314 throw new IOException(XLog.format("Not able to perform operation [{0}]", command) + " | " + "ErrorStream: "
315 + error);
316 }
317 return exitValue;
318 }
319
320 /**
321 * Do ssh action execution setup on remote host.
322 *
323 * @param host host name.
324 * @param context action execution context.
325 * @param action action object.
326 * @return remote host working directory.
327 * @throws IOException thrown if failed to setup.
328 * @throws InterruptedException thrown if any interruption happens.
329 */
330 protected String setupRemote(String host, Context context, WorkflowAction action) throws IOException, InterruptedException {
331 XLog log = XLog.getLog(getClass());
332 log.info("Attempting to copy ssh base scripts to remote host [{0}]", host);
333 String localDirLocation = Services.get().getRuntimeDir() + "/ssh";
334 if (localDirLocation.endsWith("/")) {
335 localDirLocation = localDirLocation.substring(0, localDirLocation.length() - 1);
336 }
337 File file = new File(localDirLocation + "/ssh-base.sh");
338 if (!file.exists()) {
339 throw new IOException("Required Local file " + file.getAbsolutePath() + " not present.");
340 }
341 file = new File(localDirLocation + "/ssh-wrapper.sh");
342 if (!file.exists()) {
343 throw new IOException("Required Local file " + file.getAbsolutePath() + " not present.");
344 }
345 String remoteDirLocation = getRemoteFileName(context, action, null, true, true);
346 String command = XLog.format("{0}{1} mkdir -p {2} ", SSH_COMMAND_BASE, host, remoteDirLocation).toString();
347 executeCommand(command);
348 command = XLog.format("{0}{1}/ssh-base.sh {2}/ssh-wrapper.sh {3}:{4}", SCP_COMMAND_BASE, localDirLocation,
349 localDirLocation, host, remoteDirLocation);
350 executeCommand(command);
351 command = XLog.format("{0}{1} chmod +x {2}ssh-base.sh {3}ssh-wrapper.sh ", SSH_COMMAND_BASE, host,
352 remoteDirLocation, remoteDirLocation);
353 executeCommand(command);
354 return remoteDirLocation;
355 }
356
357 /**
358 * Execute the ssh command.
359 *
360 * @param host hostname.
361 * @param dirLocation location of the base and wrapper scripts.
362 * @param cmnd command to be executed.
363 * @param args command arguments.
364 * @param ignoreOutput ignore output option.
365 * @param action action object.
366 * @param recoveryId action id + run number to enable recovery in rerun
367 * @return process id of the running command.
368 * @throws IOException thrown if failed to run the command.
369 * @throws InterruptedException thrown if any interruption happens.
370 */
371 protected String doExecute(String host, String dirLocation, String cmnd, String args, boolean ignoreOutput,
372 WorkflowAction action, String recoveryId) throws IOException, InterruptedException {
373 XLog log = XLog.getLog(getClass());
374 Runtime runtime = Runtime.getRuntime();
375 String callbackPost = ignoreOutput ? "_" : getOozieConf().get(HTTP_COMMAND_OPTIONS).replace(" ", "%%%");
376 // TODO check
377 String callBackUrl = Services.get().get(CallbackService.class)
378 .createCallBackUrl(action.getId(), EXT_STATUS_VAR);
379 String command = XLog.format("{0}{1} {2}ssh-base.sh {3} \"{4}\" \"{5}\" {6} {7} {8} ", SSH_COMMAND_BASE, host,
380 dirLocation, getOozieConf().get(HTTP_COMMAND), callBackUrl, callbackPost, recoveryId, cmnd, args)
381 .toString();
382 log.trace("Executing ssh command [{0}]", command);
383 Process p = runtime.exec(command.split("\\s"));
384 String pid = "";
385
386 StringBuffer inputBuffer = new StringBuffer();
387 StringBuffer errorBuffer = new StringBuffer();
388 int exitValue = drainBuffers(p, inputBuffer, errorBuffer, maxLen);
389
390 pid = getFirstLine(inputBuffer);
391
392 String error = null;
393 if (exitValue != 0) {
394 error = getTruncatedString(errorBuffer);
395 throw new IOException(XLog.format("Not able to execute ssh-base.sh on {0}", host) + " | " + "ErrorStream: "
396 + error);
397 }
398 return pid;
399 }
400
401 /**
402 * End action execution.
403 *
404 * @param context action execution context.
405 * @param action action object.
406 * @throws ActionExecutorException thrown if action end execution fails.
407 */
408 public void end(final Context context, final WorkflowAction action) throws ActionExecutorException {
409 if (action.getExternalStatus().equals("OK")) {
410 context.setEndData(WorkflowAction.Status.OK, WorkflowAction.Status.OK.toString());
411 }
412 else {
413 context.setEndData(WorkflowAction.Status.ERROR, WorkflowAction.Status.ERROR.toString());
414 }
415 boolean deleteTmpDir = getOozieConf().getBoolean(DELETE_TMP_DIR, true);
416 if (deleteTmpDir) {
417 String tmpDir = getRemoteFileName(context, action, null, true, false);
418 String removeTmpDirCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " rm -rf " + tmpDir;
419 int retVal = getReturnValue(removeTmpDirCmd);
420 if (retVal != 0) {
421 XLog.getLog(getClass()).warn("Cannot delete temp dir {0}", tmpDir);
422 }
423 }
424 }
425
426 /**
427 * Get the return value of a process.
428 *
429 * @param command command to be executed.
430 * @return zero if execution is successful and any non zero value for failure.
431 * @throws ActionExecutorException
432 */
433 private int getReturnValue(String command) throws ActionExecutorException {
434 int returnValue;
435 Process ps = null;
436 try {
437 ps = Runtime.getRuntime().exec(command.split("\\s"));
438 returnValue = drainBuffers(ps, null, null, 0);
439 }
440 catch (IOException e) {
441 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_OPERATION", XLog.format(
442 "Not able to perform operation {0}", command), e);
443 }
444 finally {
445 ps.destroy();
446 }
447 return returnValue;
448 }
449
450 /**
451 * Copy the ssh base and wrapper scripts to the local directory.
452 */
453 private void initSshScripts() {
454 String dirLocation = Services.get().getRuntimeDir() + "/ssh";
455 File path = new File(dirLocation);
456 if (!path.mkdirs()) {
457 throw new RuntimeException(XLog.format("Not able to create required directory {0}", dirLocation));
458 }
459 try {
460 IOUtils.copyCharStream(IOUtils.getResourceAsReader("ssh-base.sh", -1), new FileWriter(dirLocation
461 + "/ssh-base.sh"));
462 IOUtils.copyCharStream(IOUtils.getResourceAsReader("ssh-wrapper.sh", -1), new FileWriter(dirLocation
463 + "/ssh-wrapper.sh"));
464 }
465 catch (IOException ie) {
466 throw new RuntimeException(XLog.format("Not able to copy required scripts file to {0} "
467 + "for SshActionHandler", dirLocation));
468 }
469 }
470
471 /**
472 * Get action status.
473 *
474 * @param action action object.
475 * @return status of the action(RUNNING/OK/ERROR).
476 * @throws ActionExecutorException thrown if there is any error in getting status.
477 */
478 protected Status getActionStatus(Context context, WorkflowAction action) throws ActionExecutorException {
479 String command = SSH_COMMAND_BASE + action.getTrackerUri() + " ps -p " + action.getExternalId();
480 Status aStatus;
481 int returnValue = getReturnValue(command);
482 if (returnValue == 0) {
483 aStatus = Status.RUNNING;
484 }
485 else {
486 String outFile = getRemoteFileName(context, action, "error", false, true);
487 String checkErrorCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " ls " + outFile;
488 int retVal = getReturnValue(checkErrorCmd);
489 if (retVal == 0) {
490 aStatus = Status.ERROR;
491 }
492 else {
493 aStatus = Status.OK;
494 }
495 }
496 return aStatus;
497 }
498
499 /**
500 * Execute the callable.
501 *
502 * @param callable required callable.
503 * @throws ActionExecutorException thrown if there is any error in command execution.
504 */
505 private <T> T execute(Callable<T> callable) throws ActionExecutorException {
506 XLog log = XLog.getLog(getClass());
507 try {
508 return callable.call();
509 }
510 catch (IOException ex) {
511 log.warn("Error while executing ssh EXECUTION");
512 String errorMessage = ex.getMessage();
513 if (null == errorMessage) { // Unknown IOException
514 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_UNKNOWN_ERROR, ex
515 .getMessage(), ex);
516 } // Host Resolution Issues
517 else {
518 if (errorMessage.contains("Could not resolve hostname") ||
519 errorMessage.contains("service not known")) {
520 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_HOST_RESOLUTION, ex
521 .getMessage(), ex);
522 } // Connection Timeout. Host temporarily down.
523 else {
524 if (errorMessage.contains("timed out")) {
525 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_COULD_NOT_CONNECT,
526 ex.getMessage(), ex);
527 }// Local ssh-base or ssh-wrapper missing
528 else {
529 if (errorMessage.contains("Required Local file")) {
530 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_FNF,
531 ex.getMessage(), ex); // local_FNF
532 }// Required oozie bash scripts missing, after the copy was
533 // successful
534 else {
535 if (errorMessage.contains("No such file or directory")
536 && (errorMessage.contains("ssh-base") || errorMessage.contains("ssh-wrapper"))) {
537 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_FNF,
538 ex.getMessage(), ex); // remote
539 // FNF
540 } // Required application execution binary missing (either
541 // caught by ssh-wrapper
542 else {
543 if (errorMessage.contains("command not found")) {
544 throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, ERR_FNF, ex
545 .getMessage(), ex); // remote
546 // FNF
547 } // Permission denied while connecting
548 else {
549 if (errorMessage.contains("Permission denied")) {
550 throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, ERR_AUTH_FAILED, ex
551 .getMessage(), ex);
552 } // Permission denied while executing
553 else {
554 if (errorMessage.contains(": Permission denied")) {
555 throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, ERR_NO_EXEC_PERM, ex
556 .getMessage(), ex);
557 }
558 else {
559 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_UNKNOWN_ERROR, ex
560 .getMessage(), ex);
561 }
562 }
563 }
564 }
565 }
566 }
567 }
568 }
569 } // Any other type of exception
570 catch (Exception ex) {
571 throw convertException(ex);
572 }
573 }
574
575 /**
576 * Checks whether the system is configured to always use the oozie user for ssh, and injects the user if required.
577 *
578 * @param host the host string.
579 * @param context the execution context.
580 * @return the modified host string with a user parameter added on if required.
581 * @throws ActionExecutorException in case the flag to use the oozie user is turned on and there is a mismatch
582 * between the user specified in the host and the oozie user.
583 */
584 private String prepareUserHost(String host, Context context) throws ActionExecutorException {
585 String oozieUser = context.getProtoActionConf().get(OozieClient.USER_NAME);
586 if (allowSshUserAtHost) {
587 if (!host.contains("@")) {
588 host = oozieUser + "@" + host;
589 }
590 }
591 else {
592 if (host.contains("@")) {
593 if (!host.toLowerCase().startsWith(oozieUser + "@")) {
594 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_USER_MISMATCH,
595 XLog.format("user mismatch between oozie user [{0}] and ssh host [{1}]", oozieUser, host));
596 }
597 }
598 else {
599 host = oozieUser + "@" + host;
600 }
601 }
602 return host;
603 }
604
605 public boolean isCompleted(String externalStatus) {
606 return true;
607 }
608
609 /**
610 * Truncate the string to max length.
611 *
612 * @param strBuffer
613 * @return truncated string string
614 */
615 private String getTruncatedString(StringBuffer strBuffer) {
616
617 if (strBuffer.length() <= maxLen) {
618 return strBuffer.toString();
619 }
620 else {
621 return strBuffer.substring(0, maxLen);
622 }
623 }
624
625 /**
626 * Drains the inputStream and errorStream of the Process being executed. The contents of the streams are stored if a
627 * buffer is provided for the stream.
628 *
629 * @param p The Process instance.
630 * @param inputBuffer The buffer into which STDOUT is to be read. Can be null if only draining is required.
631 * @param errorBuffer The buffer into which STDERR is to be read. Can be null if only draining is required.
632 * @param maxLength The maximum data length to be stored in these buffers. This is an indicative value, and the
633 * store content may exceed this length.
634 * @return the exit value of the process.
635 * @throws IOException
636 */
637 private int drainBuffers(Process p, StringBuffer inputBuffer, StringBuffer errorBuffer, int maxLength)
638 throws IOException {
639 int exitValue = -1;
640 BufferedReader ir = new BufferedReader(new InputStreamReader(p.getInputStream()));
641 BufferedReader er = new BufferedReader(new InputStreamReader(p.getErrorStream()));
642
643 int inBytesRead = 0;
644 int errBytesRead = 0;
645
646 boolean processEnded = false;
647
648 try {
649 while (!processEnded) {
650 try {
651 exitValue = p.exitValue();
652 processEnded = true;
653 }
654 catch (IllegalThreadStateException ex) {
655 // Continue to drain.
656 }
657
658 inBytesRead += drainBuffer(ir, inputBuffer, maxLength, inBytesRead, processEnded);
659 errBytesRead += drainBuffer(er, errorBuffer, maxLength, errBytesRead, processEnded);
660 }
661 }
662 finally {
663 ir.close();
664 er.close();
665 }
666 return exitValue;
667 }
668
669 /**
670 * Reads the contents of a stream and stores them into the provided buffer.
671 *
672 * @param br The stream to be read.
673 * @param storageBuf The buffer into which the contents of the stream are to be stored.
674 * @param maxLength The maximum number of bytes to be stored in the buffer. An indicative value and may be
675 * exceeded.
676 * @param bytesRead The number of bytes read from this stream to date.
677 * @param readAll If true, the stream is drained while their is data available in it. Otherwise, only a single chunk
678 * of data is read, irrespective of how much is available.
679 * @return
680 * @throws IOException
681 */
682 private int drainBuffer(BufferedReader br, StringBuffer storageBuf, int maxLength, int bytesRead, boolean readAll)
683 throws IOException {
684 int bReadSession = 0;
685 if (br.ready()) {
686 char[] buf = new char[1024];
687 do {
688 int bReadCurrent = br.read(buf, 0, 1024);
689 if (storageBuf != null && bytesRead < maxLength) {
690 storageBuf.append(buf, 0, bReadCurrent);
691 }
692 bReadSession += bReadCurrent;
693 } while (br.ready() && readAll);
694 }
695 return bReadSession;
696 }
697
698 /**
699 * Returns the first line from a StringBuffer, recognized by the new line character \n.
700 *
701 * @param buffer The StringBuffer from which the first line is required.
702 * @return The first line of the buffer.
703 */
704 private String getFirstLine(StringBuffer buffer) {
705 int newLineIndex = 0;
706 newLineIndex = buffer.indexOf("\n");
707 if (newLineIndex == -1) {
708 return buffer.toString();
709 }
710 else {
711 return buffer.substring(0, newLineIndex);
712 }
713 }
714 }