001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.action.ssh; 020 021import java.io.BufferedReader; 022import java.io.File; 023import java.io.FileWriter; 024import java.io.IOException; 025import java.io.InputStreamReader; 026import java.util.Arrays; 027import java.util.List; 028import java.util.concurrent.Callable; 029import org.apache.hadoop.util.StringUtils; 030 031import org.apache.oozie.client.WorkflowAction; 032import org.apache.oozie.client.OozieClient; 033import org.apache.oozie.client.WorkflowAction.Status; 034import org.apache.oozie.action.ActionExecutor; 035import org.apache.oozie.action.ActionExecutorException; 036import org.apache.oozie.service.CallbackService; 037import org.apache.oozie.service.ConfigurationService; 038import org.apache.oozie.servlet.CallbackServlet; 039import org.apache.oozie.service.Services; 040import org.apache.oozie.util.IOUtils; 041import org.apache.oozie.util.PropertiesUtils; 042import org.apache.oozie.util.XLog; 043import org.apache.oozie.util.XmlUtils; 044import org.jdom.Element; 045import org.jdom.JDOMException; 046import org.jdom.Namespace; 047 048/** 049 * Ssh action executor. <p> <ul> <li>Execute the shell commands on the remote host</li> <li>Copies the base and wrapper 050 * scripts on to the remote location</li> <li>Base script is used to run the command on the remote host</li> <li>Wrapper 051 * script is used to check the status of the submitted command</li> <li>handles the submission failures</li> </ul> 052 */ 053public class SshActionExecutor extends ActionExecutor { 054 public static final String ACTION_TYPE = "ssh"; 055 056 /** 057 * Configuration parameter which specifies whether the specified ssh user is allowed, or has to be the job user. 058 */ 059 public static final String CONF_SSH_ALLOW_USER_AT_HOST = CONF_PREFIX + "ssh.allow.user.at.host"; 060 061 protected static final String SSH_COMMAND_OPTIONS = 062 "-o PasswordAuthentication=no -o KbdInteractiveDevices=no -o StrictHostKeyChecking=no -o ConnectTimeout=20 "; 063 064 protected static final String SSH_COMMAND_BASE = "ssh " + SSH_COMMAND_OPTIONS; 065 protected static final String SCP_COMMAND_BASE = "scp " + SSH_COMMAND_OPTIONS; 066 067 public static final String ERR_SETUP_FAILED = "SETUP_FAILED"; 068 public static final String ERR_EXECUTION_FAILED = "EXECUTION_FAILED"; 069 public static final String ERR_UNKNOWN_ERROR = "UNKOWN_ERROR"; 070 public static final String ERR_COULD_NOT_CONNECT = "COULD_NOT_CONNECT"; 071 public static final String ERR_HOST_RESOLUTION = "COULD_NOT_RESOLVE_HOST"; 072 public static final String ERR_FNF = "FNF"; 073 public static final String ERR_AUTH_FAILED = "AUTH_FAILED"; 074 public static final String ERR_NO_EXEC_PERM = "NO_EXEC_PERM"; 075 public static final String ERR_USER_MISMATCH = "ERR_USER_MISMATCH"; 076 public static final String ERR_EXCEDE_LEN = "ERR_OUTPUT_EXCEED_MAX_LEN"; 077 078 public static final String DELETE_TMP_DIR = "oozie.action.ssh.delete.remote.tmp.dir"; 079 080 public static final String HTTP_COMMAND = "oozie.action.ssh.http.command"; 081 082 public static final String HTTP_COMMAND_OPTIONS = "oozie.action.ssh.http.command.post.options"; 083 084 private static final String EXT_STATUS_VAR = "#status"; 085 086 private static int maxLen; 087 private static boolean allowSshUserAtHost; 088 089 protected SshActionExecutor() { 090 super(ACTION_TYPE); 091 } 092 093 /** 094 * Initialize Action. 095 */ 096 @Override 097 public void initActionType() { 098 super.initActionType(); 099 maxLen = getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN, 2 * 1024); 100 allowSshUserAtHost = ConfigurationService.getBoolean(CONF_SSH_ALLOW_USER_AT_HOST); 101 registerError(InterruptedException.class.getName(), ActionExecutorException.ErrorType.ERROR, "SH001"); 102 registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "SH002"); 103 initSshScripts(); 104 } 105 106 /** 107 * Check ssh action status. 108 * 109 * @param context action execution context. 110 * @param action action object. 111 */ 112 @Override 113 public void check(Context context, WorkflowAction action) throws ActionExecutorException { 114 Status status = getActionStatus(context, action); 115 boolean captureOutput = false; 116 try { 117 Element eConf = XmlUtils.parseXml(action.getConf()); 118 Namespace ns = eConf.getNamespace(); 119 captureOutput = eConf.getChild("capture-output", ns) != null; 120 } 121 catch (JDOMException ex) { 122 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_XML_PARSE_FAILED", 123 "unknown error", ex); 124 } 125 XLog log = XLog.getLog(getClass()); 126 log.debug("Capture Output: {0}", captureOutput); 127 if (status == Status.OK) { 128 if (captureOutput) { 129 String outFile = getRemoteFileName(context, action, "stdout", false, true); 130 String dataCommand = SSH_COMMAND_BASE + action.getTrackerUri() + " cat " + outFile; 131 log.debug("Ssh command [{0}]", dataCommand); 132 try { 133 Process process = Runtime.getRuntime().exec(dataCommand.split("\\s")); 134 StringBuffer buffer = new StringBuffer(); 135 boolean overflow = false; 136 drainBuffers(process, buffer, null, maxLen); 137 if (buffer.length() > maxLen) { 138 overflow = true; 139 } 140 if (overflow) { 141 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, 142 "ERR_OUTPUT_EXCEED_MAX_LEN", "unknown error"); 143 } 144 context.setExecutionData(status.toString(), PropertiesUtils.stringToProperties(buffer.toString())); 145 } 146 catch (Exception ex) { 147 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_UNKNOWN_ERROR", 148 "unknown error", ex); 149 } 150 } 151 else { 152 context.setExecutionData(status.toString(), null); 153 } 154 } 155 else { 156 if (status == Status.ERROR) { 157 context.setExecutionData(status.toString(), null); 158 } 159 else { 160 context.setExternalStatus(status.toString()); 161 } 162 } 163 } 164 165 /** 166 * Kill ssh action. 167 * 168 * @param context action execution context. 169 * @param action object. 170 */ 171 @Override 172 public void kill(Context context, WorkflowAction action) throws ActionExecutorException { 173 String command = "ssh " + action.getTrackerUri() + " kill -KILL " + action.getExternalId(); 174 int returnValue = getReturnValue(command); 175 if (returnValue != 0) { 176 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_TO_KILL", XLog.format( 177 "Unable to kill process {0} on {1}", action.getExternalId(), action.getTrackerUri())); 178 } 179 context.setEndData(WorkflowAction.Status.KILLED, "ERROR"); 180 } 181 182 /** 183 * Start the ssh action execution. 184 * 185 * @param context action execution context. 186 * @param action action object. 187 */ 188 @SuppressWarnings("unchecked") 189 @Override 190 public void start(final Context context, final WorkflowAction action) throws ActionExecutorException { 191 XLog log = XLog.getLog(getClass()); 192 log.info("start() begins"); 193 String confStr = action.getConf(); 194 Element conf; 195 try { 196 conf = XmlUtils.parseXml(confStr); 197 } 198 catch (Exception ex) { 199 throw convertException(ex); 200 } 201 Namespace nameSpace = conf.getNamespace(); 202 Element hostElement = conf.getChild("host", nameSpace); 203 String hostString = hostElement.getValue().trim(); 204 hostString = prepareUserHost(hostString, context); 205 final String host = hostString; 206 final String dirLocation = execute(new Callable<String>() { 207 public String call() throws Exception { 208 return setupRemote(host, context, action); 209 } 210 211 }); 212 213 String runningPid = execute(new Callable<String>() { 214 public String call() throws Exception { 215 return checkIfRunning(host, context, action); 216 } 217 }); 218 String pid = ""; 219 220 if (runningPid == null) { 221 final Element commandElement = conf.getChild("command", nameSpace); 222 final boolean ignoreOutput = conf.getChild("capture-output", nameSpace) == null; 223 224 boolean preserve = false; 225 if (commandElement != null) { 226 String[] args = null; 227 // Will either have <args>, <arg>, or neither (but not both) 228 List<Element> argsList = conf.getChildren("args", nameSpace); 229 // Arguments in an <args> are "flattened" (spaces are delimiters) 230 if (argsList != null && argsList.size() > 0) { 231 StringBuilder argsString = new StringBuilder(""); 232 for (Element argsElement : argsList) { 233 argsString = argsString.append(argsElement.getValue()).append(" "); 234 } 235 args = new String[]{argsString.toString()}; 236 } 237 else { 238 // Arguments in an <arg> are preserved, even with spaces 239 argsList = conf.getChildren("arg", nameSpace); 240 if (argsList != null && argsList.size() > 0) { 241 preserve = true; 242 args = new String[argsList.size()]; 243 for (int i = 0; i < argsList.size(); i++) { 244 Element argsElement = argsList.get(i); 245 args[i] = argsElement.getValue(); 246 // Even though we're keeping the args as an array, if they contain a space we still have to either quote 247 // them or escape their space (because the scripts will split them up otherwise) 248 if (args[i].contains(" ") && 249 !(args[i].startsWith("\"") && args[i].endsWith("\"") || 250 args[i].startsWith("'") && args[i].endsWith("'"))) { 251 args[i] = StringUtils.escapeString(args[i], '\\', ' '); 252 } 253 } 254 } 255 } 256 final String[] argsF = args; 257 final String recoveryId = context.getRecoveryId(); 258 final boolean preserveF = preserve; 259 pid = execute(new Callable<String>() { 260 261 @Override 262 public String call() throws Exception { 263 return doExecute(host, dirLocation, commandElement.getValue(), argsF, ignoreOutput, action, recoveryId, 264 preserveF); 265 } 266 267 }); 268 } 269 context.setStartData(pid, host, host); 270 } 271 else { 272 pid = runningPid; 273 context.setStartData(pid, host, host); 274 check(context, action); 275 } 276 log.info("start() ends"); 277 } 278 279 private String checkIfRunning(String host, final Context context, final WorkflowAction action) { 280 String pid = null; 281 String outFile = getRemoteFileName(context, action, "pid", false, false); 282 String getOutputCmd = SSH_COMMAND_BASE + host + " cat " + outFile; 283 try { 284 Process process = Runtime.getRuntime().exec(getOutputCmd.split("\\s")); 285 StringBuffer buffer = new StringBuffer(); 286 drainBuffers(process, buffer, null, maxLen); 287 pid = getFirstLine(buffer); 288 289 if (Long.valueOf(pid) > 0) { 290 return pid; 291 } 292 else { 293 return null; 294 } 295 } 296 catch (Exception e) { 297 return null; 298 } 299 } 300 301 /** 302 * Get remote host working location. 303 * 304 * @param context action execution context 305 * @param action Action 306 * @param fileExtension Extension to be added to file name 307 * @param dirOnly Get the Directory only 308 * @param useExtId Flag to use external ID in the path 309 * @return remote host file name/Directory. 310 */ 311 public String getRemoteFileName(Context context, WorkflowAction action, String fileExtension, boolean dirOnly, 312 boolean useExtId) { 313 String path = getActionDirPath(context.getWorkflow().getId(), action, ACTION_TYPE, false) + "/"; 314 if (dirOnly) { 315 return path; 316 } 317 if (useExtId) { 318 path = path + action.getExternalId() + "."; 319 } 320 path = path + context.getRecoveryId() + "." + fileExtension; 321 return path; 322 } 323 324 /** 325 * Utility method to execute command. 326 * 327 * @param command Command to execute as String. 328 * @return exit status of the execution. 329 * @throws IOException if process exits with status nonzero. 330 * @throws InterruptedException if process does not run properly. 331 */ 332 public int executeCommand(String command) throws IOException, InterruptedException { 333 Runtime runtime = Runtime.getRuntime(); 334 Process p = runtime.exec(command.split("\\s")); 335 336 StringBuffer errorBuffer = new StringBuffer(); 337 int exitValue = drainBuffers(p, null, errorBuffer, maxLen); 338 339 String error = null; 340 if (exitValue != 0) { 341 error = getTruncatedString(errorBuffer); 342 throw new IOException(XLog.format("Not able to perform operation [{0}]", command) + " | " + "ErrorStream: " 343 + error); 344 } 345 return exitValue; 346 } 347 348 /** 349 * Do ssh action execution setup on remote host. 350 * 351 * @param host host name. 352 * @param context action execution context. 353 * @param action action object. 354 * @return remote host working directory. 355 * @throws IOException thrown if failed to setup. 356 * @throws InterruptedException thrown if any interruption happens. 357 */ 358 protected String setupRemote(String host, Context context, WorkflowAction action) throws IOException, InterruptedException { 359 XLog log = XLog.getLog(getClass()); 360 log.info("Attempting to copy ssh base scripts to remote host [{0}]", host); 361 String localDirLocation = Services.get().getRuntimeDir() + "/ssh"; 362 if (localDirLocation.endsWith("/")) { 363 localDirLocation = localDirLocation.substring(0, localDirLocation.length() - 1); 364 } 365 File file = new File(localDirLocation + "/ssh-base.sh"); 366 if (!file.exists()) { 367 throw new IOException("Required Local file " + file.getAbsolutePath() + " not present."); 368 } 369 file = new File(localDirLocation + "/ssh-wrapper.sh"); 370 if (!file.exists()) { 371 throw new IOException("Required Local file " + file.getAbsolutePath() + " not present."); 372 } 373 String remoteDirLocation = getRemoteFileName(context, action, null, true, true); 374 String command = XLog.format("{0}{1} mkdir -p {2} ", SSH_COMMAND_BASE, host, remoteDirLocation).toString(); 375 executeCommand(command); 376 command = XLog.format("{0}{1}/ssh-base.sh {2}/ssh-wrapper.sh {3}:{4}", SCP_COMMAND_BASE, localDirLocation, 377 localDirLocation, host, remoteDirLocation); 378 executeCommand(command); 379 command = XLog.format("{0}{1} chmod +x {2}ssh-base.sh {3}ssh-wrapper.sh ", SSH_COMMAND_BASE, host, 380 remoteDirLocation, remoteDirLocation); 381 executeCommand(command); 382 return remoteDirLocation; 383 } 384 385 /** 386 * Execute the ssh command. 387 * 388 * @param host hostname. 389 * @param dirLocation location of the base and wrapper scripts. 390 * @param cmnd command to be executed. 391 * @param args command arguments. 392 * @param ignoreOutput ignore output option. 393 * @param action action object. 394 * @param recoveryId action id + run number to enable recovery in rerun 395 * @param preserveArgs tell the ssh scripts to preserve or flatten the arguments 396 * @return process id of the running command. 397 * @throws IOException thrown if failed to run the command. 398 * @throws InterruptedException thrown if any interruption happens. 399 */ 400 protected String doExecute(String host, String dirLocation, String cmnd, String[] args, boolean ignoreOutput, 401 WorkflowAction action, String recoveryId, boolean preserveArgs) 402 throws IOException, InterruptedException { 403 XLog log = XLog.getLog(getClass()); 404 Runtime runtime = Runtime.getRuntime(); 405 String callbackPost = ignoreOutput ? "_" : ConfigurationService.get(HTTP_COMMAND_OPTIONS).replace(" ", "%%%"); 406 String preserveArgsS = preserveArgs ? "PRESERVE_ARGS" : "FLATTEN_ARGS"; 407 // TODO check 408 String callBackUrl = Services.get().get(CallbackService.class) 409 .createCallBackUrl(action.getId(), EXT_STATUS_VAR); 410 String command = XLog.format("{0}{1} {2}ssh-base.sh {3} {4} \"{5}\" \"{6}\" {7} {8} ", SSH_COMMAND_BASE, host, dirLocation, 411 preserveArgsS, ConfigurationService.get(HTTP_COMMAND), callBackUrl, callbackPost, recoveryId, cmnd) 412 .toString(); 413 String[] commandArray = command.split("\\s"); 414 String[] finalCommand; 415 if (args == null) { 416 finalCommand = commandArray; 417 } 418 else { 419 finalCommand = new String[commandArray.length + args.length]; 420 System.arraycopy(commandArray, 0, finalCommand, 0, commandArray.length); 421 System.arraycopy(args, 0, finalCommand, commandArray.length, args.length); 422 } 423 log.trace("Executing ssh command [{0}]", Arrays.toString(finalCommand)); 424 Process p = runtime.exec(finalCommand); 425 String pid = ""; 426 427 StringBuffer inputBuffer = new StringBuffer(); 428 StringBuffer errorBuffer = new StringBuffer(); 429 int exitValue = drainBuffers(p, inputBuffer, errorBuffer, maxLen); 430 431 pid = getFirstLine(inputBuffer); 432 433 String error = null; 434 if (exitValue != 0) { 435 error = getTruncatedString(errorBuffer); 436 throw new IOException(XLog.format("Not able to execute ssh-base.sh on {0}", host) + " | " + "ErrorStream: " 437 + error); 438 } 439 return pid; 440 } 441 442 /** 443 * End action execution. 444 * 445 * @param context action execution context. 446 * @param action action object. 447 * @throws ActionExecutorException thrown if action end execution fails. 448 */ 449 public void end(final Context context, final WorkflowAction action) throws ActionExecutorException { 450 if (action.getExternalStatus().equals("OK")) { 451 context.setEndData(WorkflowAction.Status.OK, WorkflowAction.Status.OK.toString()); 452 } 453 else { 454 context.setEndData(WorkflowAction.Status.ERROR, WorkflowAction.Status.ERROR.toString()); 455 } 456 boolean deleteTmpDir = ConfigurationService.getBoolean(DELETE_TMP_DIR); 457 if (deleteTmpDir) { 458 String tmpDir = getRemoteFileName(context, action, null, true, false); 459 String removeTmpDirCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " rm -rf " + tmpDir; 460 int retVal = getReturnValue(removeTmpDirCmd); 461 if (retVal != 0) { 462 XLog.getLog(getClass()).warn("Cannot delete temp dir {0}", tmpDir); 463 } 464 } 465 } 466 467 /** 468 * Get the return value of a process. 469 * 470 * @param command command to be executed. 471 * @return zero if execution is successful and any non zero value for failure. 472 * @throws ActionExecutorException 473 */ 474 private int getReturnValue(String command) throws ActionExecutorException { 475 int returnValue; 476 Process ps = null; 477 try { 478 ps = Runtime.getRuntime().exec(command.split("\\s")); 479 returnValue = drainBuffers(ps, null, null, 0); 480 } 481 catch (IOException e) { 482 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_OPERATION", XLog.format( 483 "Not able to perform operation {0}", command), e); 484 } 485 finally { 486 ps.destroy(); 487 } 488 return returnValue; 489 } 490 491 /** 492 * Copy the ssh base and wrapper scripts to the local directory. 493 */ 494 private void initSshScripts() { 495 String dirLocation = Services.get().getRuntimeDir() + "/ssh"; 496 File path = new File(dirLocation); 497 path.mkdirs(); 498 if (!path.exists()) { 499 throw new RuntimeException(XLog.format("Not able to create required directory {0}", dirLocation)); 500 } 501 try { 502 IOUtils.copyCharStream(IOUtils.getResourceAsReader("ssh-base.sh", -1), new FileWriter(dirLocation 503 + "/ssh-base.sh")); 504 IOUtils.copyCharStream(IOUtils.getResourceAsReader("ssh-wrapper.sh", -1), new FileWriter(dirLocation 505 + "/ssh-wrapper.sh")); 506 } 507 catch (IOException ie) { 508 throw new RuntimeException(XLog.format("Not able to copy required scripts file to {0} " 509 + "for SshActionHandler", dirLocation)); 510 } 511 } 512 513 /** 514 * Get action status. 515 * 516 * @param action action object. 517 * @return status of the action(RUNNING/OK/ERROR). 518 * @throws ActionExecutorException thrown if there is any error in getting status. 519 */ 520 protected Status getActionStatus(Context context, WorkflowAction action) throws ActionExecutorException { 521 String command = SSH_COMMAND_BASE + action.getTrackerUri() + " ps -p " + action.getExternalId(); 522 Status aStatus; 523 int returnValue = getReturnValue(command); 524 if (returnValue == 0) { 525 aStatus = Status.RUNNING; 526 } 527 else { 528 String outFile = getRemoteFileName(context, action, "error", false, true); 529 String checkErrorCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " ls " + outFile; 530 int retVal = getReturnValue(checkErrorCmd); 531 if (retVal == 0) { 532 aStatus = Status.ERROR; 533 } 534 else { 535 aStatus = Status.OK; 536 } 537 } 538 return aStatus; 539 } 540 541 /** 542 * Execute the callable. 543 * 544 * @param callable required callable. 545 * @throws ActionExecutorException thrown if there is any error in command execution. 546 */ 547 private <T> T execute(Callable<T> callable) throws ActionExecutorException { 548 XLog log = XLog.getLog(getClass()); 549 try { 550 return callable.call(); 551 } 552 catch (IOException ex) { 553 log.warn("Error while executing ssh EXECUTION"); 554 String errorMessage = ex.getMessage(); 555 if (null == errorMessage) { // Unknown IOException 556 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_UNKNOWN_ERROR, ex 557 .getMessage(), ex); 558 } // Host Resolution Issues 559 else { 560 if (errorMessage.contains("Could not resolve hostname") || 561 errorMessage.contains("service not known")) { 562 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_HOST_RESOLUTION, ex 563 .getMessage(), ex); 564 } // Connection Timeout. Host temporarily down. 565 else { 566 if (errorMessage.contains("timed out")) { 567 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_COULD_NOT_CONNECT, 568 ex.getMessage(), ex); 569 }// Local ssh-base or ssh-wrapper missing 570 else { 571 if (errorMessage.contains("Required Local file")) { 572 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_FNF, 573 ex.getMessage(), ex); // local_FNF 574 }// Required oozie bash scripts missing, after the copy was 575 // successful 576 else { 577 if (errorMessage.contains("No such file or directory") 578 && (errorMessage.contains("ssh-base") || errorMessage.contains("ssh-wrapper"))) { 579 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_FNF, 580 ex.getMessage(), ex); // remote 581 // FNF 582 } // Required application execution binary missing (either 583 // caught by ssh-wrapper 584 else { 585 if (errorMessage.contains("command not found")) { 586 throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, ERR_FNF, ex 587 .getMessage(), ex); // remote 588 // FNF 589 } // Permission denied while connecting 590 else { 591 if (errorMessage.contains("Permission denied")) { 592 throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, 593 ERR_AUTH_FAILED, ex.getMessage(), ex); 594 } // Permission denied while executing 595 else { 596 if (errorMessage.contains(": Permission denied")) { 597 throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, 598 ERR_NO_EXEC_PERM, ex.getMessage(), ex); 599 } 600 else { 601 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, 602 ERR_UNKNOWN_ERROR, ex.getMessage(), ex); 603 } 604 } 605 } 606 } 607 } 608 } 609 } 610 } 611 } // Any other type of exception 612 catch (Exception ex) { 613 throw convertException(ex); 614 } 615 } 616 617 /** 618 * Checks whether the system is configured to always use the oozie user for ssh, and injects the user if required. 619 * 620 * @param host the host string. 621 * @param context the execution context. 622 * @return the modified host string with a user parameter added on if required. 623 * @throws ActionExecutorException in case the flag to use the oozie user is turned on and there is a mismatch 624 * between the user specified in the host and the oozie user. 625 */ 626 private String prepareUserHost(String host, Context context) throws ActionExecutorException { 627 String oozieUser = context.getProtoActionConf().get(OozieClient.USER_NAME); 628 if (allowSshUserAtHost) { 629 if (!host.contains("@")) { 630 host = oozieUser + "@" + host; 631 } 632 } 633 else { 634 if (host.contains("@")) { 635 if (!host.toLowerCase().startsWith(oozieUser + "@")) { 636 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_USER_MISMATCH, 637 XLog.format("user mismatch between oozie user [{0}] and ssh host [{1}]", 638 oozieUser, host)); 639 } 640 } 641 else { 642 host = oozieUser + "@" + host; 643 } 644 } 645 return host; 646 } 647 648 @Override 649 public boolean isCompleted(String externalStatus) { 650 return true; 651 } 652 653 /** 654 * Truncate the string to max length. 655 * 656 * @param strBuffer 657 * @return truncated string string 658 */ 659 private String getTruncatedString(StringBuffer strBuffer) { 660 661 if (strBuffer.length() <= maxLen) { 662 return strBuffer.toString(); 663 } 664 else { 665 return strBuffer.substring(0, maxLen); 666 } 667 } 668 669 /** 670 * Drains the inputStream and errorStream of the Process being executed. The contents of the streams are stored if a 671 * buffer is provided for the stream. 672 * 673 * @param p The Process instance. 674 * @param inputBuffer The buffer into which STDOUT is to be read. Can be null if only draining is required. 675 * @param errorBuffer The buffer into which STDERR is to be read. Can be null if only draining is required. 676 * @param maxLength The maximum data length to be stored in these buffers. This is an indicative value, and the 677 * store content may exceed this length. 678 * @return the exit value of the process. 679 * @throws IOException 680 */ 681 private int drainBuffers(Process p, StringBuffer inputBuffer, StringBuffer errorBuffer, int maxLength) 682 throws IOException { 683 int exitValue = -1; 684 BufferedReader ir = new BufferedReader(new InputStreamReader(p.getInputStream())); 685 BufferedReader er = new BufferedReader(new InputStreamReader(p.getErrorStream())); 686 687 int inBytesRead = 0; 688 int errBytesRead = 0; 689 690 boolean processEnded = false; 691 692 try { 693 while (!processEnded) { 694 try { 695 exitValue = p.exitValue(); 696 processEnded = true; 697 } 698 catch (IllegalThreadStateException ex) { 699 // Continue to drain. 700 } 701 702 inBytesRead += drainBuffer(ir, inputBuffer, maxLength, inBytesRead, processEnded); 703 errBytesRead += drainBuffer(er, errorBuffer, maxLength, errBytesRead, processEnded); 704 } 705 } 706 finally { 707 ir.close(); 708 er.close(); 709 } 710 return exitValue; 711 } 712 713 /** 714 * Reads the contents of a stream and stores them into the provided buffer. 715 * 716 * @param br The stream to be read. 717 * @param storageBuf The buffer into which the contents of the stream are to be stored. 718 * @param maxLength The maximum number of bytes to be stored in the buffer. An indicative value and may be 719 * exceeded. 720 * @param bytesRead The number of bytes read from this stream to date. 721 * @param readAll If true, the stream is drained while their is data available in it. Otherwise, only a single chunk 722 * of data is read, irrespective of how much is available. 723 * @return 724 * @throws IOException 725 */ 726 private int drainBuffer(BufferedReader br, StringBuffer storageBuf, int maxLength, int bytesRead, boolean readAll) 727 throws IOException { 728 int bReadSession = 0; 729 if (br.ready()) { 730 char[] buf = new char[1024]; 731 do { 732 int bReadCurrent = br.read(buf, 0, 1024); 733 if (storageBuf != null && bytesRead < maxLength) { 734 storageBuf.append(buf, 0, bReadCurrent); 735 } 736 bReadSession += bReadCurrent; 737 } while (br.ready() && readAll); 738 } 739 return bReadSession; 740 } 741 742 /** 743 * Returns the first line from a StringBuffer, recognized by the new line character \n. 744 * 745 * @param buffer The StringBuffer from which the first line is required. 746 * @return The first line of the buffer. 747 */ 748 private String getFirstLine(StringBuffer buffer) { 749 int newLineIndex = 0; 750 newLineIndex = buffer.indexOf("\n"); 751 if (newLineIndex == -1) { 752 return buffer.toString(); 753 } 754 else { 755 return buffer.substring(0, newLineIndex); 756 } 757 } 758}