001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.ssh; 019 020 import java.io.BufferedReader; 021 import java.io.File; 022 import java.io.FileWriter; 023 import java.io.IOException; 024 import java.io.InputStreamReader; 025 import java.util.Arrays; 026 import java.util.List; 027 import java.util.concurrent.Callable; 028 import org.apache.hadoop.util.StringUtils; 029 030 import org.apache.oozie.client.WorkflowAction; 031 import org.apache.oozie.client.OozieClient; 032 import org.apache.oozie.client.WorkflowAction.Status; 033 import org.apache.oozie.action.ActionExecutor; 034 import org.apache.oozie.action.ActionExecutorException; 035 import org.apache.oozie.service.CallbackService; 036 import org.apache.oozie.servlet.CallbackServlet; 037 import org.apache.oozie.service.Services; 038 import org.apache.oozie.util.IOUtils; 039 import org.apache.oozie.util.PropertiesUtils; 040 import org.apache.oozie.util.XLog; 041 import org.apache.oozie.util.XmlUtils; 042 import org.jdom.Element; 043 import org.jdom.JDOMException; 044 import org.jdom.Namespace; 045 046 /** 047 * Ssh action executor. <p/> <ul> <li>Execute the shell commands on the remote host</li> <li>Copies the base and wrapper 048 * scripts on to the remote location</li> <li>Base script is used to run the command on the remote host</li> <li>Wrapper 049 * script is used to check the status of the submitted command</li> <li>handles the submission failures</li> </ul> 050 */ 051 public class SshActionExecutor extends ActionExecutor { 052 public static final String ACTION_TYPE = "ssh"; 053 054 /** 055 * Configuration parameter which specifies whether the specified ssh user is allowed, or has to be the job user. 056 */ 057 public static final String CONF_SSH_ALLOW_USER_AT_HOST = CONF_PREFIX + "ssh.allow.user.at.host"; 058 059 protected static final String SSH_COMMAND_OPTIONS = 060 "-o PasswordAuthentication=no -o KbdInteractiveDevices=no -o StrictHostKeyChecking=no -o ConnectTimeout=20 "; 061 062 protected static final String SSH_COMMAND_BASE = "ssh " + SSH_COMMAND_OPTIONS; 063 protected static final String SCP_COMMAND_BASE = "scp " + SSH_COMMAND_OPTIONS; 064 065 public static final String ERR_SETUP_FAILED = "SETUP_FAILED"; 066 public static final String ERR_EXECUTION_FAILED = "EXECUTION_FAILED"; 067 public static final String ERR_UNKNOWN_ERROR = "UNKOWN_ERROR"; 068 public static final String ERR_COULD_NOT_CONNECT = "COULD_NOT_CONNECT"; 069 public static final String ERR_HOST_RESOLUTION = "COULD_NOT_RESOLVE_HOST"; 070 public static final String ERR_FNF = "FNF"; 071 public static final String ERR_AUTH_FAILED = "AUTH_FAILED"; 072 public static final String ERR_NO_EXEC_PERM = "NO_EXEC_PERM"; 073 public static final String ERR_USER_MISMATCH = "ERR_USER_MISMATCH"; 074 public static final String ERR_EXCEDE_LEN = "ERR_OUTPUT_EXCEED_MAX_LEN"; 075 076 public static final String DELETE_TMP_DIR = "oozie.action.ssh.delete.remote.tmp.dir"; 077 078 public static final String HTTP_COMMAND = "oozie.action.ssh.http.command"; 079 080 public static final String HTTP_COMMAND_OPTIONS = "oozie.action.ssh.http.command.post.options"; 081 082 private static final String EXT_STATUS_VAR = "#status"; 083 084 private static int maxLen; 085 private static boolean allowSshUserAtHost; 086 087 protected SshActionExecutor() { 088 super(ACTION_TYPE); 089 } 090 091 /** 092 * Initialize Action. 093 */ 094 @Override 095 public void initActionType() { 096 super.initActionType(); 097 maxLen = getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN, 2 * 1024); 098 allowSshUserAtHost = getOozieConf().getBoolean(CONF_SSH_ALLOW_USER_AT_HOST, true); 099 registerError(InterruptedException.class.getName(), ActionExecutorException.ErrorType.ERROR, "SH001"); 100 registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "SH002"); 101 initSshScripts(); 102 } 103 104 /** 105 * Check ssh action status. 106 * 107 * @param context action execution context. 108 * @param action action object. 109 */ 110 @Override 111 public void check(Context context, WorkflowAction action) throws ActionExecutorException { 112 Status status = getActionStatus(context, action); 113 boolean captureOutput = false; 114 try { 115 Element eConf = XmlUtils.parseXml(action.getConf()); 116 Namespace ns = eConf.getNamespace(); 117 captureOutput = eConf.getChild("capture-output", ns) != null; 118 } 119 catch (JDOMException ex) { 120 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_XML_PARSE_FAILED", 121 "unknown error", ex); 122 } 123 XLog log = XLog.getLog(getClass()); 124 log.debug("Capture Output: {0}", captureOutput); 125 if (status == Status.OK) { 126 if (captureOutput) { 127 String outFile = getRemoteFileName(context, action, "stdout", false, true); 128 String dataCommand = SSH_COMMAND_BASE + action.getTrackerUri() + " cat " + outFile; 129 log.debug("Ssh command [{0}]", dataCommand); 130 try { 131 Process process = Runtime.getRuntime().exec(dataCommand.split("\\s")); 132 StringBuffer buffer = new StringBuffer(); 133 boolean overflow = false; 134 drainBuffers(process, buffer, null, maxLen); 135 if (buffer.length() > maxLen) { 136 overflow = true; 137 } 138 if (overflow) { 139 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, 140 "ERR_OUTPUT_EXCEED_MAX_LEN", "unknown error"); 141 } 142 context.setExecutionData(status.toString(), PropertiesUtils.stringToProperties(buffer.toString())); 143 } 144 catch (Exception ex) { 145 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_UNKNOWN_ERROR", 146 "unknown error", ex); 147 } 148 } 149 else { 150 context.setExecutionData(status.toString(), null); 151 } 152 } 153 else { 154 if (status == Status.ERROR) { 155 context.setExecutionData(status.toString(), null); 156 } 157 else { 158 context.setExternalStatus(status.toString()); 159 } 160 } 161 } 162 163 /** 164 * Kill ssh action. 165 * 166 * @param context action execution context. 167 * @param action object. 168 */ 169 @Override 170 public void kill(Context context, WorkflowAction action) throws ActionExecutorException { 171 String command = "ssh " + action.getTrackerUri() + " kill -KILL " + action.getExternalId(); 172 int returnValue = getReturnValue(command); 173 if (returnValue != 0) { 174 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_TO_KILL", XLog.format( 175 "Unable to kill process {0} on {1}", action.getExternalId(), action.getTrackerUri())); 176 } 177 context.setEndData(WorkflowAction.Status.KILLED, "ERROR"); 178 } 179 180 /** 181 * Start the ssh action execution. 182 * 183 * @param context action execution context. 184 * @param action action object. 185 */ 186 @SuppressWarnings("unchecked") 187 @Override 188 public void start(final Context context, final WorkflowAction action) throws ActionExecutorException { 189 XLog log = XLog.getLog(getClass()); 190 log.info("start() begins"); 191 String confStr = action.getConf(); 192 Element conf; 193 try { 194 conf = XmlUtils.parseXml(confStr); 195 } 196 catch (Exception ex) { 197 throw convertException(ex); 198 } 199 Namespace nameSpace = conf.getNamespace(); 200 Element hostElement = conf.getChild("host", nameSpace); 201 String hostString = hostElement.getValue().trim(); 202 hostString = prepareUserHost(hostString, context); 203 final String host = hostString; 204 final String dirLocation = execute(new Callable<String>() { 205 public String call() throws Exception { 206 return setupRemote(host, context, action); 207 } 208 209 }); 210 211 String runningPid = execute(new Callable<String>() { 212 public String call() throws Exception { 213 return checkIfRunning(host, context, action); 214 } 215 }); 216 String pid = ""; 217 218 if (runningPid == null) { 219 final Element commandElement = conf.getChild("command", nameSpace); 220 final boolean ignoreOutput = conf.getChild("capture-output", nameSpace) == null; 221 222 boolean preserve = false; 223 if (commandElement != null) { 224 String[] args = null; 225 // Will either have <args>, <arg>, or neither (but not both) 226 List<Element> argsList = conf.getChildren("args", nameSpace); 227 // Arguments in an <args> are "flattened" (spaces are delimiters) 228 if (argsList != null && argsList.size() > 0) { 229 StringBuilder argsString = new StringBuilder(""); 230 for (Element argsElement : argsList) { 231 argsString = argsString.append(argsElement.getValue()).append(" "); 232 } 233 args = new String[]{argsString.toString()}; 234 } 235 else { 236 // Arguments in an <arg> are preserved, even with spaces 237 argsList = conf.getChildren("arg", nameSpace); 238 if (argsList != null && argsList.size() > 0) { 239 preserve = true; 240 args = new String[argsList.size()]; 241 for (int i = 0; i < argsList.size(); i++) { 242 Element argsElement = argsList.get(i); 243 args[i] = argsElement.getValue(); 244 // Even though we're keeping the args as an array, if they contain a space we still have to either quote 245 // them or escape their space (because the scripts will split them up otherwise) 246 if (args[i].contains(" ") && 247 !(args[i].startsWith("\"") && args[i].endsWith("\"") || 248 args[i].startsWith("'") && args[i].endsWith("'"))) { 249 args[i] = StringUtils.escapeString(args[i], '\\', ' '); 250 } 251 } 252 } 253 } 254 final String[] argsF = args; 255 final String recoveryId = context.getRecoveryId(); 256 final boolean preserveF = preserve; 257 pid = execute(new Callable<String>() { 258 259 @Override 260 public String call() throws Exception { 261 return doExecute(host, dirLocation, commandElement.getValue(), argsF, ignoreOutput, action, recoveryId, 262 preserveF); 263 } 264 265 }); 266 } 267 context.setStartData(pid, host, host); 268 } 269 else { 270 pid = runningPid; 271 context.setStartData(pid, host, host); 272 check(context, action); 273 } 274 log.info("start() ends"); 275 } 276 277 private String checkIfRunning(String host, final Context context, final WorkflowAction action) { 278 String pid = null; 279 String outFile = getRemoteFileName(context, action, "pid", false, false); 280 String getOutputCmd = SSH_COMMAND_BASE + host + " cat " + outFile; 281 try { 282 Process process = Runtime.getRuntime().exec(getOutputCmd.split("\\s")); 283 StringBuffer buffer = new StringBuffer(); 284 drainBuffers(process, buffer, null, maxLen); 285 pid = getFirstLine(buffer); 286 287 if (Long.valueOf(pid) > 0) { 288 return pid; 289 } 290 else { 291 return null; 292 } 293 } 294 catch (Exception e) { 295 return null; 296 } 297 } 298 299 /** 300 * Get remote host working location. 301 * 302 * @param context action execution context 303 * @param action Action 304 * @param fileExtension Extension to be added to file name 305 * @param dirOnly Get the Directory only 306 * @param useExtId Flag to use external ID in the path 307 * @return remote host file name/Directory. 308 */ 309 public String getRemoteFileName(Context context, WorkflowAction action, String fileExtension, boolean dirOnly, 310 boolean useExtId) { 311 String path = getActionDirPath(context.getWorkflow().getId(), action, ACTION_TYPE, false) + "/"; 312 if (dirOnly) { 313 return path; 314 } 315 if (useExtId) { 316 path = path + action.getExternalId() + "."; 317 } 318 path = path + context.getRecoveryId() + "." + fileExtension; 319 return path; 320 } 321 322 /** 323 * Utility method to execute command. 324 * 325 * @param command Command to execute as String. 326 * @return exit status of the execution. 327 * @throws IOException if process exits with status nonzero. 328 * @throws InterruptedException if process does not run properly. 329 */ 330 public int executeCommand(String command) throws IOException, InterruptedException { 331 Runtime runtime = Runtime.getRuntime(); 332 Process p = runtime.exec(command.split("\\s")); 333 334 StringBuffer errorBuffer = new StringBuffer(); 335 int exitValue = drainBuffers(p, null, errorBuffer, maxLen); 336 337 String error = null; 338 if (exitValue != 0) { 339 error = getTruncatedString(errorBuffer); 340 throw new IOException(XLog.format("Not able to perform operation [{0}]", command) + " | " + "ErrorStream: " 341 + error); 342 } 343 return exitValue; 344 } 345 346 /** 347 * Do ssh action execution setup on remote host. 348 * 349 * @param host host name. 350 * @param context action execution context. 351 * @param action action object. 352 * @return remote host working directory. 353 * @throws IOException thrown if failed to setup. 354 * @throws InterruptedException thrown if any interruption happens. 355 */ 356 protected String setupRemote(String host, Context context, WorkflowAction action) throws IOException, InterruptedException { 357 XLog log = XLog.getLog(getClass()); 358 log.info("Attempting to copy ssh base scripts to remote host [{0}]", host); 359 String localDirLocation = Services.get().getRuntimeDir() + "/ssh"; 360 if (localDirLocation.endsWith("/")) { 361 localDirLocation = localDirLocation.substring(0, localDirLocation.length() - 1); 362 } 363 File file = new File(localDirLocation + "/ssh-base.sh"); 364 if (!file.exists()) { 365 throw new IOException("Required Local file " + file.getAbsolutePath() + " not present."); 366 } 367 file = new File(localDirLocation + "/ssh-wrapper.sh"); 368 if (!file.exists()) { 369 throw new IOException("Required Local file " + file.getAbsolutePath() + " not present."); 370 } 371 String remoteDirLocation = getRemoteFileName(context, action, null, true, true); 372 String command = XLog.format("{0}{1} mkdir -p {2} ", SSH_COMMAND_BASE, host, remoteDirLocation).toString(); 373 executeCommand(command); 374 command = XLog.format("{0}{1}/ssh-base.sh {2}/ssh-wrapper.sh {3}:{4}", SCP_COMMAND_BASE, localDirLocation, 375 localDirLocation, host, remoteDirLocation); 376 executeCommand(command); 377 command = XLog.format("{0}{1} chmod +x {2}ssh-base.sh {3}ssh-wrapper.sh ", SSH_COMMAND_BASE, host, 378 remoteDirLocation, remoteDirLocation); 379 executeCommand(command); 380 return remoteDirLocation; 381 } 382 383 /** 384 * Execute the ssh command. 385 * 386 * @param host hostname. 387 * @param dirLocation location of the base and wrapper scripts. 388 * @param cmnd command to be executed. 389 * @param args command arguments. 390 * @param ignoreOutput ignore output option. 391 * @param action action object. 392 * @param recoveryId action id + run number to enable recovery in rerun 393 * @param preserveArgs tell the ssh scripts to preserve or flatten the arguments 394 * @return process id of the running command. 395 * @throws IOException thrown if failed to run the command. 396 * @throws InterruptedException thrown if any interruption happens. 397 */ 398 protected String doExecute(String host, String dirLocation, String cmnd, String[] args, boolean ignoreOutput, 399 WorkflowAction action, String recoveryId, boolean preserveArgs) 400 throws IOException, InterruptedException { 401 XLog log = XLog.getLog(getClass()); 402 Runtime runtime = Runtime.getRuntime(); 403 String callbackPost = ignoreOutput ? "_" : getOozieConf().get(HTTP_COMMAND_OPTIONS).replace(" ", "%%%"); 404 String preserveArgsS = preserveArgs ? "PRESERVE_ARGS" : "FLATTEN_ARGS"; 405 // TODO check 406 String callBackUrl = Services.get().get(CallbackService.class) 407 .createCallBackUrl(action.getId(), EXT_STATUS_VAR); 408 String command = XLog.format("{0}{1} {2}ssh-base.sh {3} {4} \"{5}\" \"{6}\" {7} {8} ", SSH_COMMAND_BASE, host, dirLocation, 409 preserveArgsS, getOozieConf().get(HTTP_COMMAND), callBackUrl, callbackPost, recoveryId, cmnd) 410 .toString(); 411 String[] commandArray = command.split("\\s"); 412 String[] finalCommand; 413 if (args == null) { 414 finalCommand = commandArray; 415 } 416 else { 417 finalCommand = new String[commandArray.length + args.length]; 418 System.arraycopy(commandArray, 0, finalCommand, 0, commandArray.length); 419 System.arraycopy(args, 0, finalCommand, commandArray.length, args.length); 420 } 421 log.trace("Executing ssh command [{0}]", Arrays.toString(finalCommand)); 422 Process p = runtime.exec(finalCommand); 423 String pid = ""; 424 425 StringBuffer inputBuffer = new StringBuffer(); 426 StringBuffer errorBuffer = new StringBuffer(); 427 int exitValue = drainBuffers(p, inputBuffer, errorBuffer, maxLen); 428 429 pid = getFirstLine(inputBuffer); 430 431 String error = null; 432 if (exitValue != 0) { 433 error = getTruncatedString(errorBuffer); 434 throw new IOException(XLog.format("Not able to execute ssh-base.sh on {0}", host) + " | " + "ErrorStream: " 435 + error); 436 } 437 return pid; 438 } 439 440 /** 441 * End action execution. 442 * 443 * @param context action execution context. 444 * @param action action object. 445 * @throws ActionExecutorException thrown if action end execution fails. 446 */ 447 public void end(final Context context, final WorkflowAction action) throws ActionExecutorException { 448 if (action.getExternalStatus().equals("OK")) { 449 context.setEndData(WorkflowAction.Status.OK, WorkflowAction.Status.OK.toString()); 450 } 451 else { 452 context.setEndData(WorkflowAction.Status.ERROR, WorkflowAction.Status.ERROR.toString()); 453 } 454 boolean deleteTmpDir = getOozieConf().getBoolean(DELETE_TMP_DIR, true); 455 if (deleteTmpDir) { 456 String tmpDir = getRemoteFileName(context, action, null, true, false); 457 String removeTmpDirCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " rm -rf " + tmpDir; 458 int retVal = getReturnValue(removeTmpDirCmd); 459 if (retVal != 0) { 460 XLog.getLog(getClass()).warn("Cannot delete temp dir {0}", tmpDir); 461 } 462 } 463 } 464 465 /** 466 * Get the return value of a process. 467 * 468 * @param command command to be executed. 469 * @return zero if execution is successful and any non zero value for failure. 470 * @throws ActionExecutorException 471 */ 472 private int getReturnValue(String command) throws ActionExecutorException { 473 int returnValue; 474 Process ps = null; 475 try { 476 ps = Runtime.getRuntime().exec(command.split("\\s")); 477 returnValue = drainBuffers(ps, null, null, 0); 478 } 479 catch (IOException e) { 480 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_OPERATION", XLog.format( 481 "Not able to perform operation {0}", command), e); 482 } 483 finally { 484 ps.destroy(); 485 } 486 return returnValue; 487 } 488 489 /** 490 * Copy the ssh base and wrapper scripts to the local directory. 491 */ 492 private void initSshScripts() { 493 String dirLocation = Services.get().getRuntimeDir() + "/ssh"; 494 File path = new File(dirLocation); 495 path.mkdirs(); 496 if (!path.exists()) { 497 throw new RuntimeException(XLog.format("Not able to create required directory {0}", dirLocation)); 498 } 499 try { 500 IOUtils.copyCharStream(IOUtils.getResourceAsReader("ssh-base.sh", -1), new FileWriter(dirLocation 501 + "/ssh-base.sh")); 502 IOUtils.copyCharStream(IOUtils.getResourceAsReader("ssh-wrapper.sh", -1), new FileWriter(dirLocation 503 + "/ssh-wrapper.sh")); 504 } 505 catch (IOException ie) { 506 throw new RuntimeException(XLog.format("Not able to copy required scripts file to {0} " 507 + "for SshActionHandler", dirLocation)); 508 } 509 } 510 511 /** 512 * Get action status. 513 * 514 * @param action action object. 515 * @return status of the action(RUNNING/OK/ERROR). 516 * @throws ActionExecutorException thrown if there is any error in getting status. 517 */ 518 protected Status getActionStatus(Context context, WorkflowAction action) throws ActionExecutorException { 519 String command = SSH_COMMAND_BASE + action.getTrackerUri() + " ps -p " + action.getExternalId(); 520 Status aStatus; 521 int returnValue = getReturnValue(command); 522 if (returnValue == 0) { 523 aStatus = Status.RUNNING; 524 } 525 else { 526 String outFile = getRemoteFileName(context, action, "error", false, true); 527 String checkErrorCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " ls " + outFile; 528 int retVal = getReturnValue(checkErrorCmd); 529 if (retVal == 0) { 530 aStatus = Status.ERROR; 531 } 532 else { 533 aStatus = Status.OK; 534 } 535 } 536 return aStatus; 537 } 538 539 /** 540 * Execute the callable. 541 * 542 * @param callable required callable. 543 * @throws ActionExecutorException thrown if there is any error in command execution. 544 */ 545 private <T> T execute(Callable<T> callable) throws ActionExecutorException { 546 XLog log = XLog.getLog(getClass()); 547 try { 548 return callable.call(); 549 } 550 catch (IOException ex) { 551 log.warn("Error while executing ssh EXECUTION"); 552 String errorMessage = ex.getMessage(); 553 if (null == errorMessage) { // Unknown IOException 554 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_UNKNOWN_ERROR, ex 555 .getMessage(), ex); 556 } // Host Resolution Issues 557 else { 558 if (errorMessage.contains("Could not resolve hostname") || 559 errorMessage.contains("service not known")) { 560 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_HOST_RESOLUTION, ex 561 .getMessage(), ex); 562 } // Connection Timeout. Host temporarily down. 563 else { 564 if (errorMessage.contains("timed out")) { 565 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_COULD_NOT_CONNECT, 566 ex.getMessage(), ex); 567 }// Local ssh-base or ssh-wrapper missing 568 else { 569 if (errorMessage.contains("Required Local file")) { 570 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_FNF, 571 ex.getMessage(), ex); // local_FNF 572 }// Required oozie bash scripts missing, after the copy was 573 // successful 574 else { 575 if (errorMessage.contains("No such file or directory") 576 && (errorMessage.contains("ssh-base") || errorMessage.contains("ssh-wrapper"))) { 577 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_FNF, 578 ex.getMessage(), ex); // remote 579 // FNF 580 } // Required application execution binary missing (either 581 // caught by ssh-wrapper 582 else { 583 if (errorMessage.contains("command not found")) { 584 throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, ERR_FNF, ex 585 .getMessage(), ex); // remote 586 // FNF 587 } // Permission denied while connecting 588 else { 589 if (errorMessage.contains("Permission denied")) { 590 throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, 591 ERR_AUTH_FAILED, ex.getMessage(), ex); 592 } // Permission denied while executing 593 else { 594 if (errorMessage.contains(": Permission denied")) { 595 throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, 596 ERR_NO_EXEC_PERM, ex.getMessage(), ex); 597 } 598 else { 599 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, 600 ERR_UNKNOWN_ERROR, ex.getMessage(), ex); 601 } 602 } 603 } 604 } 605 } 606 } 607 } 608 } 609 } // Any other type of exception 610 catch (Exception ex) { 611 throw convertException(ex); 612 } 613 } 614 615 /** 616 * Checks whether the system is configured to always use the oozie user for ssh, and injects the user if required. 617 * 618 * @param host the host string. 619 * @param context the execution context. 620 * @return the modified host string with a user parameter added on if required. 621 * @throws ActionExecutorException in case the flag to use the oozie user is turned on and there is a mismatch 622 * between the user specified in the host and the oozie user. 623 */ 624 private String prepareUserHost(String host, Context context) throws ActionExecutorException { 625 String oozieUser = context.getProtoActionConf().get(OozieClient.USER_NAME); 626 if (allowSshUserAtHost) { 627 if (!host.contains("@")) { 628 host = oozieUser + "@" + host; 629 } 630 } 631 else { 632 if (host.contains("@")) { 633 if (!host.toLowerCase().startsWith(oozieUser + "@")) { 634 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_USER_MISMATCH, 635 XLog.format("user mismatch between oozie user [{0}] and ssh host [{1}]", 636 oozieUser, host)); 637 } 638 } 639 else { 640 host = oozieUser + "@" + host; 641 } 642 } 643 return host; 644 } 645 646 @Override 647 public boolean isCompleted(String externalStatus) { 648 return true; 649 } 650 651 /** 652 * Truncate the string to max length. 653 * 654 * @param strBuffer 655 * @return truncated string string 656 */ 657 private String getTruncatedString(StringBuffer strBuffer) { 658 659 if (strBuffer.length() <= maxLen) { 660 return strBuffer.toString(); 661 } 662 else { 663 return strBuffer.substring(0, maxLen); 664 } 665 } 666 667 /** 668 * Drains the inputStream and errorStream of the Process being executed. The contents of the streams are stored if a 669 * buffer is provided for the stream. 670 * 671 * @param p The Process instance. 672 * @param inputBuffer The buffer into which STDOUT is to be read. Can be null if only draining is required. 673 * @param errorBuffer The buffer into which STDERR is to be read. Can be null if only draining is required. 674 * @param maxLength The maximum data length to be stored in these buffers. This is an indicative value, and the 675 * store content may exceed this length. 676 * @return the exit value of the process. 677 * @throws IOException 678 */ 679 private int drainBuffers(Process p, StringBuffer inputBuffer, StringBuffer errorBuffer, int maxLength) 680 throws IOException { 681 int exitValue = -1; 682 BufferedReader ir = new BufferedReader(new InputStreamReader(p.getInputStream())); 683 BufferedReader er = new BufferedReader(new InputStreamReader(p.getErrorStream())); 684 685 int inBytesRead = 0; 686 int errBytesRead = 0; 687 688 boolean processEnded = false; 689 690 try { 691 while (!processEnded) { 692 try { 693 exitValue = p.exitValue(); 694 processEnded = true; 695 } 696 catch (IllegalThreadStateException ex) { 697 // Continue to drain. 698 } 699 700 inBytesRead += drainBuffer(ir, inputBuffer, maxLength, inBytesRead, processEnded); 701 errBytesRead += drainBuffer(er, errorBuffer, maxLength, errBytesRead, processEnded); 702 } 703 } 704 finally { 705 ir.close(); 706 er.close(); 707 } 708 return exitValue; 709 } 710 711 /** 712 * Reads the contents of a stream and stores them into the provided buffer. 713 * 714 * @param br The stream to be read. 715 * @param storageBuf The buffer into which the contents of the stream are to be stored. 716 * @param maxLength The maximum number of bytes to be stored in the buffer. An indicative value and may be 717 * exceeded. 718 * @param bytesRead The number of bytes read from this stream to date. 719 * @param readAll If true, the stream is drained while their is data available in it. Otherwise, only a single chunk 720 * of data is read, irrespective of how much is available. 721 * @return 722 * @throws IOException 723 */ 724 private int drainBuffer(BufferedReader br, StringBuffer storageBuf, int maxLength, int bytesRead, boolean readAll) 725 throws IOException { 726 int bReadSession = 0; 727 if (br.ready()) { 728 char[] buf = new char[1024]; 729 do { 730 int bReadCurrent = br.read(buf, 0, 1024); 731 if (storageBuf != null && bytesRead < maxLength) { 732 storageBuf.append(buf, 0, bReadCurrent); 733 } 734 bReadSession += bReadCurrent; 735 } while (br.ready() && readAll); 736 } 737 return bReadSession; 738 } 739 740 /** 741 * Returns the first line from a StringBuffer, recognized by the new line character \n. 742 * 743 * @param buffer The StringBuffer from which the first line is required. 744 * @return The first line of the buffer. 745 */ 746 private String getFirstLine(StringBuffer buffer) { 747 int newLineIndex = 0; 748 newLineIndex = buffer.indexOf("\n"); 749 if (newLineIndex == -1) { 750 return buffer.toString(); 751 } 752 else { 753 return buffer.substring(0, newLineIndex); 754 } 755 } 756 }