This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.ssh;
019
020 import java.io.BufferedReader;
021 import java.io.File;
022 import java.io.FileWriter;
023 import java.io.IOException;
024 import java.io.InputStreamReader;
025 import java.util.Arrays;
026 import java.util.List;
027 import java.util.concurrent.Callable;
028 import org.apache.hadoop.util.StringUtils;
029
030 import org.apache.oozie.client.WorkflowAction;
031 import org.apache.oozie.client.OozieClient;
032 import org.apache.oozie.client.WorkflowAction.Status;
033 import org.apache.oozie.action.ActionExecutor;
034 import org.apache.oozie.action.ActionExecutorException;
035 import org.apache.oozie.service.CallbackService;
036 import org.apache.oozie.servlet.CallbackServlet;
037 import org.apache.oozie.service.Services;
038 import org.apache.oozie.util.IOUtils;
039 import org.apache.oozie.util.PropertiesUtils;
040 import org.apache.oozie.util.XLog;
041 import org.apache.oozie.util.XmlUtils;
042 import org.jdom.Element;
043 import org.jdom.JDOMException;
044 import org.jdom.Namespace;
045
046 /**
047 * Ssh action executor. <p/> <ul> <li>Execute the shell commands on the remote host</li> <li>Copies the base and wrapper
048 * scripts on to the remote location</li> <li>Base script is used to run the command on the remote host</li> <li>Wrapper
049 * script is used to check the status of the submitted command</li> <li>handles the submission failures</li> </ul>
050 */
051 public class SshActionExecutor extends ActionExecutor {
052 public static final String ACTION_TYPE = "ssh";
053
054 /**
055 * Configuration parameter which specifies whether the specified ssh user is allowed, or has to be the job user.
056 */
057 public static final String CONF_SSH_ALLOW_USER_AT_HOST = CONF_PREFIX + "ssh.allow.user.at.host";
058
059 protected static final String SSH_COMMAND_OPTIONS =
060 "-o PasswordAuthentication=no -o KbdInteractiveDevices=no -o StrictHostKeyChecking=no -o ConnectTimeout=20 ";
061
062 protected static final String SSH_COMMAND_BASE = "ssh " + SSH_COMMAND_OPTIONS;
063 protected static final String SCP_COMMAND_BASE = "scp " + SSH_COMMAND_OPTIONS;
064
065 public static final String ERR_SETUP_FAILED = "SETUP_FAILED";
066 public static final String ERR_EXECUTION_FAILED = "EXECUTION_FAILED";
067 public static final String ERR_UNKNOWN_ERROR = "UNKOWN_ERROR";
068 public static final String ERR_COULD_NOT_CONNECT = "COULD_NOT_CONNECT";
069 public static final String ERR_HOST_RESOLUTION = "COULD_NOT_RESOLVE_HOST";
070 public static final String ERR_FNF = "FNF";
071 public static final String ERR_AUTH_FAILED = "AUTH_FAILED";
072 public static final String ERR_NO_EXEC_PERM = "NO_EXEC_PERM";
073 public static final String ERR_USER_MISMATCH = "ERR_USER_MISMATCH";
074 public static final String ERR_EXCEDE_LEN = "ERR_OUTPUT_EXCEED_MAX_LEN";
075
076 public static final String DELETE_TMP_DIR = "oozie.action.ssh.delete.remote.tmp.dir";
077
078 public static final String HTTP_COMMAND = "oozie.action.ssh.http.command";
079
080 public static final String HTTP_COMMAND_OPTIONS = "oozie.action.ssh.http.command.post.options";
081
082 private static final String EXT_STATUS_VAR = "#status";
083
084 private static int maxLen;
085 private static boolean allowSshUserAtHost;
086
087 protected SshActionExecutor() {
088 super(ACTION_TYPE);
089 }
090
091 /**
092 * Initialize Action.
093 */
094 @Override
095 public void initActionType() {
096 super.initActionType();
097 maxLen = getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN, 2 * 1024);
098 allowSshUserAtHost = getOozieConf().getBoolean(CONF_SSH_ALLOW_USER_AT_HOST, true);
099 registerError(InterruptedException.class.getName(), ActionExecutorException.ErrorType.ERROR, "SH001");
100 registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "SH002");
101 initSshScripts();
102 }
103
104 /**
105 * Check ssh action status.
106 *
107 * @param context action execution context.
108 * @param action action object.
109 */
110 @Override
111 public void check(Context context, WorkflowAction action) throws ActionExecutorException {
112 Status status = getActionStatus(context, action);
113 boolean captureOutput = false;
114 try {
115 Element eConf = XmlUtils.parseXml(action.getConf());
116 Namespace ns = eConf.getNamespace();
117 captureOutput = eConf.getChild("capture-output", ns) != null;
118 }
119 catch (JDOMException ex) {
120 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_XML_PARSE_FAILED",
121 "unknown error", ex);
122 }
123 XLog log = XLog.getLog(getClass());
124 log.debug("Capture Output: {0}", captureOutput);
125 if (status == Status.OK) {
126 if (captureOutput) {
127 String outFile = getRemoteFileName(context, action, "stdout", false, true);
128 String dataCommand = SSH_COMMAND_BASE + action.getTrackerUri() + " cat " + outFile;
129 log.debug("Ssh command [{0}]", dataCommand);
130 try {
131 Process process = Runtime.getRuntime().exec(dataCommand.split("\\s"));
132 StringBuffer buffer = new StringBuffer();
133 boolean overflow = false;
134 drainBuffers(process, buffer, null, maxLen);
135 if (buffer.length() > maxLen) {
136 overflow = true;
137 }
138 if (overflow) {
139 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR,
140 "ERR_OUTPUT_EXCEED_MAX_LEN", "unknown error");
141 }
142 context.setExecutionData(status.toString(), PropertiesUtils.stringToProperties(buffer.toString()));
143 }
144 catch (Exception ex) {
145 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_UNKNOWN_ERROR",
146 "unknown error", ex);
147 }
148 }
149 else {
150 context.setExecutionData(status.toString(), null);
151 }
152 }
153 else {
154 if (status == Status.ERROR) {
155 context.setExecutionData(status.toString(), null);
156 }
157 else {
158 context.setExternalStatus(status.toString());
159 }
160 }
161 }
162
163 /**
164 * Kill ssh action.
165 *
166 * @param context action execution context.
167 * @param action object.
168 */
169 @Override
170 public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
171 String command = "ssh " + action.getTrackerUri() + " kill -KILL " + action.getExternalId();
172 int returnValue = getReturnValue(command);
173 if (returnValue != 0) {
174 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_TO_KILL", XLog.format(
175 "Unable to kill process {0} on {1}", action.getExternalId(), action.getTrackerUri()));
176 }
177 context.setEndData(WorkflowAction.Status.KILLED, "ERROR");
178 }
179
180 /**
181 * Start the ssh action execution.
182 *
183 * @param context action execution context.
184 * @param action action object.
185 */
186 @SuppressWarnings("unchecked")
187 @Override
188 public void start(final Context context, final WorkflowAction action) throws ActionExecutorException {
189 XLog log = XLog.getLog(getClass());
190 log.info("start() begins");
191 String confStr = action.getConf();
192 Element conf;
193 try {
194 conf = XmlUtils.parseXml(confStr);
195 }
196 catch (Exception ex) {
197 throw convertException(ex);
198 }
199 Namespace nameSpace = conf.getNamespace();
200 Element hostElement = conf.getChild("host", nameSpace);
201 String hostString = hostElement.getValue().trim();
202 hostString = prepareUserHost(hostString, context);
203 final String host = hostString;
204 final String dirLocation = execute(new Callable<String>() {
205 public String call() throws Exception {
206 return setupRemote(host, context, action);
207 }
208
209 });
210
211 String runningPid = execute(new Callable<String>() {
212 public String call() throws Exception {
213 return checkIfRunning(host, context, action);
214 }
215 });
216 String pid = "";
217
218 if (runningPid == null) {
219 final Element commandElement = conf.getChild("command", nameSpace);
220 final boolean ignoreOutput = conf.getChild("capture-output", nameSpace) == null;
221
222 boolean preserve = false;
223 if (commandElement != null) {
224 String[] args = null;
225 // Will either have <args>, <arg>, or neither (but not both)
226 List<Element> argsList = conf.getChildren("args", nameSpace);
227 // Arguments in an <args> are "flattened" (spaces are delimiters)
228 if (argsList != null && argsList.size() > 0) {
229 StringBuilder argsString = new StringBuilder("");
230 for (Element argsElement : argsList) {
231 argsString = argsString.append(argsElement.getValue()).append(" ");
232 }
233 args = new String[]{argsString.toString()};
234 }
235 else {
236 // Arguments in an <arg> are preserved, even with spaces
237 argsList = conf.getChildren("arg", nameSpace);
238 if (argsList != null && argsList.size() > 0) {
239 preserve = true;
240 args = new String[argsList.size()];
241 for (int i = 0; i < argsList.size(); i++) {
242 Element argsElement = argsList.get(i);
243 args[i] = argsElement.getValue();
244 // Even though we're keeping the args as an array, if they contain a space we still have to either quote
245 // them or escape their space (because the scripts will split them up otherwise)
246 if (args[i].contains(" ") &&
247 !(args[i].startsWith("\"") && args[i].endsWith("\"") ||
248 args[i].startsWith("'") && args[i].endsWith("'"))) {
249 args[i] = StringUtils.escapeString(args[i], '\\', ' ');
250 }
251 }
252 }
253 }
254 final String[] argsF = args;
255 final String recoveryId = context.getRecoveryId();
256 final boolean preserveF = preserve;
257 pid = execute(new Callable<String>() {
258
259 @Override
260 public String call() throws Exception {
261 return doExecute(host, dirLocation, commandElement.getValue(), argsF, ignoreOutput, action, recoveryId,
262 preserveF);
263 }
264
265 });
266 }
267 context.setStartData(pid, host, host);
268 }
269 else {
270 pid = runningPid;
271 context.setStartData(pid, host, host);
272 check(context, action);
273 }
274 log.info("start() ends");
275 }
276
277 private String checkIfRunning(String host, final Context context, final WorkflowAction action) {
278 String pid = null;
279 String outFile = getRemoteFileName(context, action, "pid", false, false);
280 String getOutputCmd = SSH_COMMAND_BASE + host + " cat " + outFile;
281 try {
282 Process process = Runtime.getRuntime().exec(getOutputCmd.split("\\s"));
283 StringBuffer buffer = new StringBuffer();
284 drainBuffers(process, buffer, null, maxLen);
285 pid = getFirstLine(buffer);
286
287 if (Long.valueOf(pid) > 0) {
288 return pid;
289 }
290 else {
291 return null;
292 }
293 }
294 catch (Exception e) {
295 return null;
296 }
297 }
298
299 /**
300 * Get remote host working location.
301 *
302 * @param context action execution context
303 * @param action Action
304 * @param fileExtension Extension to be added to file name
305 * @param dirOnly Get the Directory only
306 * @param useExtId Flag to use external ID in the path
307 * @return remote host file name/Directory.
308 */
309 public String getRemoteFileName(Context context, WorkflowAction action, String fileExtension, boolean dirOnly,
310 boolean useExtId) {
311 String path = getActionDirPath(context.getWorkflow().getId(), action, ACTION_TYPE, false) + "/";
312 if (dirOnly) {
313 return path;
314 }
315 if (useExtId) {
316 path = path + action.getExternalId() + ".";
317 }
318 path = path + context.getRecoveryId() + "." + fileExtension;
319 return path;
320 }
321
322 /**
323 * Utility method to execute command.
324 *
325 * @param command Command to execute as String.
326 * @return exit status of the execution.
327 * @throws IOException if process exits with status nonzero.
328 * @throws InterruptedException if process does not run properly.
329 */
330 public int executeCommand(String command) throws IOException, InterruptedException {
331 Runtime runtime = Runtime.getRuntime();
332 Process p = runtime.exec(command.split("\\s"));
333
334 StringBuffer errorBuffer = new StringBuffer();
335 int exitValue = drainBuffers(p, null, errorBuffer, maxLen);
336
337 String error = null;
338 if (exitValue != 0) {
339 error = getTruncatedString(errorBuffer);
340 throw new IOException(XLog.format("Not able to perform operation [{0}]", command) + " | " + "ErrorStream: "
341 + error);
342 }
343 return exitValue;
344 }
345
346 /**
347 * Do ssh action execution setup on remote host.
348 *
349 * @param host host name.
350 * @param context action execution context.
351 * @param action action object.
352 * @return remote host working directory.
353 * @throws IOException thrown if failed to setup.
354 * @throws InterruptedException thrown if any interruption happens.
355 */
356 protected String setupRemote(String host, Context context, WorkflowAction action) throws IOException, InterruptedException {
357 XLog log = XLog.getLog(getClass());
358 log.info("Attempting to copy ssh base scripts to remote host [{0}]", host);
359 String localDirLocation = Services.get().getRuntimeDir() + "/ssh";
360 if (localDirLocation.endsWith("/")) {
361 localDirLocation = localDirLocation.substring(0, localDirLocation.length() - 1);
362 }
363 File file = new File(localDirLocation + "/ssh-base.sh");
364 if (!file.exists()) {
365 throw new IOException("Required Local file " + file.getAbsolutePath() + " not present.");
366 }
367 file = new File(localDirLocation + "/ssh-wrapper.sh");
368 if (!file.exists()) {
369 throw new IOException("Required Local file " + file.getAbsolutePath() + " not present.");
370 }
371 String remoteDirLocation = getRemoteFileName(context, action, null, true, true);
372 String command = XLog.format("{0}{1} mkdir -p {2} ", SSH_COMMAND_BASE, host, remoteDirLocation).toString();
373 executeCommand(command);
374 command = XLog.format("{0}{1}/ssh-base.sh {2}/ssh-wrapper.sh {3}:{4}", SCP_COMMAND_BASE, localDirLocation,
375 localDirLocation, host, remoteDirLocation);
376 executeCommand(command);
377 command = XLog.format("{0}{1} chmod +x {2}ssh-base.sh {3}ssh-wrapper.sh ", SSH_COMMAND_BASE, host,
378 remoteDirLocation, remoteDirLocation);
379 executeCommand(command);
380 return remoteDirLocation;
381 }
382
383 /**
384 * Execute the ssh command.
385 *
386 * @param host hostname.
387 * @param dirLocation location of the base and wrapper scripts.
388 * @param cmnd command to be executed.
389 * @param args command arguments.
390 * @param ignoreOutput ignore output option.
391 * @param action action object.
392 * @param recoveryId action id + run number to enable recovery in rerun
393 * @param preserveArgs tell the ssh scripts to preserve or flatten the arguments
394 * @return process id of the running command.
395 * @throws IOException thrown if failed to run the command.
396 * @throws InterruptedException thrown if any interruption happens.
397 */
398 protected String doExecute(String host, String dirLocation, String cmnd, String[] args, boolean ignoreOutput,
399 WorkflowAction action, String recoveryId, boolean preserveArgs)
400 throws IOException, InterruptedException {
401 XLog log = XLog.getLog(getClass());
402 Runtime runtime = Runtime.getRuntime();
403 String callbackPost = ignoreOutput ? "_" : getOozieConf().get(HTTP_COMMAND_OPTIONS).replace(" ", "%%%");
404 String preserveArgsS = preserveArgs ? "PRESERVE_ARGS" : "FLATTEN_ARGS";
405 // TODO check
406 String callBackUrl = Services.get().get(CallbackService.class)
407 .createCallBackUrl(action.getId(), EXT_STATUS_VAR);
408 String command = XLog.format("{0}{1} {2}ssh-base.sh {3} {4} \"{5}\" \"{6}\" {7} {8} ", SSH_COMMAND_BASE, host, dirLocation,
409 preserveArgsS, getOozieConf().get(HTTP_COMMAND), callBackUrl, callbackPost, recoveryId, cmnd)
410 .toString();
411 String[] commandArray = command.split("\\s");
412 String[] finalCommand;
413 if (args == null) {
414 finalCommand = commandArray;
415 }
416 else {
417 finalCommand = new String[commandArray.length + args.length];
418 System.arraycopy(commandArray, 0, finalCommand, 0, commandArray.length);
419 System.arraycopy(args, 0, finalCommand, commandArray.length, args.length);
420 }
421 log.trace("Executing ssh command [{0}]", Arrays.toString(finalCommand));
422 Process p = runtime.exec(finalCommand);
423 String pid = "";
424
425 StringBuffer inputBuffer = new StringBuffer();
426 StringBuffer errorBuffer = new StringBuffer();
427 int exitValue = drainBuffers(p, inputBuffer, errorBuffer, maxLen);
428
429 pid = getFirstLine(inputBuffer);
430
431 String error = null;
432 if (exitValue != 0) {
433 error = getTruncatedString(errorBuffer);
434 throw new IOException(XLog.format("Not able to execute ssh-base.sh on {0}", host) + " | " + "ErrorStream: "
435 + error);
436 }
437 return pid;
438 }
439
440 /**
441 * End action execution.
442 *
443 * @param context action execution context.
444 * @param action action object.
445 * @throws ActionExecutorException thrown if action end execution fails.
446 */
447 public void end(final Context context, final WorkflowAction action) throws ActionExecutorException {
448 if (action.getExternalStatus().equals("OK")) {
449 context.setEndData(WorkflowAction.Status.OK, WorkflowAction.Status.OK.toString());
450 }
451 else {
452 context.setEndData(WorkflowAction.Status.ERROR, WorkflowAction.Status.ERROR.toString());
453 }
454 boolean deleteTmpDir = getOozieConf().getBoolean(DELETE_TMP_DIR, true);
455 if (deleteTmpDir) {
456 String tmpDir = getRemoteFileName(context, action, null, true, false);
457 String removeTmpDirCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " rm -rf " + tmpDir;
458 int retVal = getReturnValue(removeTmpDirCmd);
459 if (retVal != 0) {
460 XLog.getLog(getClass()).warn("Cannot delete temp dir {0}", tmpDir);
461 }
462 }
463 }
464
465 /**
466 * Get the return value of a process.
467 *
468 * @param command command to be executed.
469 * @return zero if execution is successful and any non zero value for failure.
470 * @throws ActionExecutorException
471 */
472 private int getReturnValue(String command) throws ActionExecutorException {
473 int returnValue;
474 Process ps = null;
475 try {
476 ps = Runtime.getRuntime().exec(command.split("\\s"));
477 returnValue = drainBuffers(ps, null, null, 0);
478 }
479 catch (IOException e) {
480 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_OPERATION", XLog.format(
481 "Not able to perform operation {0}", command), e);
482 }
483 finally {
484 ps.destroy();
485 }
486 return returnValue;
487 }
488
489 /**
490 * Copy the ssh base and wrapper scripts to the local directory.
491 */
492 private void initSshScripts() {
493 String dirLocation = Services.get().getRuntimeDir() + "/ssh";
494 File path = new File(dirLocation);
495 path.mkdirs();
496 if (!path.exists()) {
497 throw new RuntimeException(XLog.format("Not able to create required directory {0}", dirLocation));
498 }
499 try {
500 IOUtils.copyCharStream(IOUtils.getResourceAsReader("ssh-base.sh", -1), new FileWriter(dirLocation
501 + "/ssh-base.sh"));
502 IOUtils.copyCharStream(IOUtils.getResourceAsReader("ssh-wrapper.sh", -1), new FileWriter(dirLocation
503 + "/ssh-wrapper.sh"));
504 }
505 catch (IOException ie) {
506 throw new RuntimeException(XLog.format("Not able to copy required scripts file to {0} "
507 + "for SshActionHandler", dirLocation));
508 }
509 }
510
511 /**
512 * Get action status.
513 *
514 * @param action action object.
515 * @return status of the action(RUNNING/OK/ERROR).
516 * @throws ActionExecutorException thrown if there is any error in getting status.
517 */
518 protected Status getActionStatus(Context context, WorkflowAction action) throws ActionExecutorException {
519 String command = SSH_COMMAND_BASE + action.getTrackerUri() + " ps -p " + action.getExternalId();
520 Status aStatus;
521 int returnValue = getReturnValue(command);
522 if (returnValue == 0) {
523 aStatus = Status.RUNNING;
524 }
525 else {
526 String outFile = getRemoteFileName(context, action, "error", false, true);
527 String checkErrorCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " ls " + outFile;
528 int retVal = getReturnValue(checkErrorCmd);
529 if (retVal == 0) {
530 aStatus = Status.ERROR;
531 }
532 else {
533 aStatus = Status.OK;
534 }
535 }
536 return aStatus;
537 }
538
539 /**
540 * Execute the callable.
541 *
542 * @param callable required callable.
543 * @throws ActionExecutorException thrown if there is any error in command execution.
544 */
545 private <T> T execute(Callable<T> callable) throws ActionExecutorException {
546 XLog log = XLog.getLog(getClass());
547 try {
548 return callable.call();
549 }
550 catch (IOException ex) {
551 log.warn("Error while executing ssh EXECUTION");
552 String errorMessage = ex.getMessage();
553 if (null == errorMessage) { // Unknown IOException
554 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_UNKNOWN_ERROR, ex
555 .getMessage(), ex);
556 } // Host Resolution Issues
557 else {
558 if (errorMessage.contains("Could not resolve hostname") ||
559 errorMessage.contains("service not known")) {
560 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_HOST_RESOLUTION, ex
561 .getMessage(), ex);
562 } // Connection Timeout. Host temporarily down.
563 else {
564 if (errorMessage.contains("timed out")) {
565 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_COULD_NOT_CONNECT,
566 ex.getMessage(), ex);
567 }// Local ssh-base or ssh-wrapper missing
568 else {
569 if (errorMessage.contains("Required Local file")) {
570 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_FNF,
571 ex.getMessage(), ex); // local_FNF
572 }// Required oozie bash scripts missing, after the copy was
573 // successful
574 else {
575 if (errorMessage.contains("No such file or directory")
576 && (errorMessage.contains("ssh-base") || errorMessage.contains("ssh-wrapper"))) {
577 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_FNF,
578 ex.getMessage(), ex); // remote
579 // FNF
580 } // Required application execution binary missing (either
581 // caught by ssh-wrapper
582 else {
583 if (errorMessage.contains("command not found")) {
584 throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, ERR_FNF, ex
585 .getMessage(), ex); // remote
586 // FNF
587 } // Permission denied while connecting
588 else {
589 if (errorMessage.contains("Permission denied")) {
590 throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT,
591 ERR_AUTH_FAILED, ex.getMessage(), ex);
592 } // Permission denied while executing
593 else {
594 if (errorMessage.contains(": Permission denied")) {
595 throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT,
596 ERR_NO_EXEC_PERM, ex.getMessage(), ex);
597 }
598 else {
599 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR,
600 ERR_UNKNOWN_ERROR, ex.getMessage(), ex);
601 }
602 }
603 }
604 }
605 }
606 }
607 }
608 }
609 } // Any other type of exception
610 catch (Exception ex) {
611 throw convertException(ex);
612 }
613 }
614
615 /**
616 * Checks whether the system is configured to always use the oozie user for ssh, and injects the user if required.
617 *
618 * @param host the host string.
619 * @param context the execution context.
620 * @return the modified host string with a user parameter added on if required.
621 * @throws ActionExecutorException in case the flag to use the oozie user is turned on and there is a mismatch
622 * between the user specified in the host and the oozie user.
623 */
624 private String prepareUserHost(String host, Context context) throws ActionExecutorException {
625 String oozieUser = context.getProtoActionConf().get(OozieClient.USER_NAME);
626 if (allowSshUserAtHost) {
627 if (!host.contains("@")) {
628 host = oozieUser + "@" + host;
629 }
630 }
631 else {
632 if (host.contains("@")) {
633 if (!host.toLowerCase().startsWith(oozieUser + "@")) {
634 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_USER_MISMATCH,
635 XLog.format("user mismatch between oozie user [{0}] and ssh host [{1}]",
636 oozieUser, host));
637 }
638 }
639 else {
640 host = oozieUser + "@" + host;
641 }
642 }
643 return host;
644 }
645
646 @Override
647 public boolean isCompleted(String externalStatus) {
648 return true;
649 }
650
651 /**
652 * Truncate the string to max length.
653 *
654 * @param strBuffer
655 * @return truncated string string
656 */
657 private String getTruncatedString(StringBuffer strBuffer) {
658
659 if (strBuffer.length() <= maxLen) {
660 return strBuffer.toString();
661 }
662 else {
663 return strBuffer.substring(0, maxLen);
664 }
665 }
666
667 /**
668 * Drains the inputStream and errorStream of the Process being executed. The contents of the streams are stored if a
669 * buffer is provided for the stream.
670 *
671 * @param p The Process instance.
672 * @param inputBuffer The buffer into which STDOUT is to be read. Can be null if only draining is required.
673 * @param errorBuffer The buffer into which STDERR is to be read. Can be null if only draining is required.
674 * @param maxLength The maximum data length to be stored in these buffers. This is an indicative value, and the
675 * store content may exceed this length.
676 * @return the exit value of the process.
677 * @throws IOException
678 */
679 private int drainBuffers(Process p, StringBuffer inputBuffer, StringBuffer errorBuffer, int maxLength)
680 throws IOException {
681 int exitValue = -1;
682 BufferedReader ir = new BufferedReader(new InputStreamReader(p.getInputStream()));
683 BufferedReader er = new BufferedReader(new InputStreamReader(p.getErrorStream()));
684
685 int inBytesRead = 0;
686 int errBytesRead = 0;
687
688 boolean processEnded = false;
689
690 try {
691 while (!processEnded) {
692 try {
693 exitValue = p.exitValue();
694 processEnded = true;
695 }
696 catch (IllegalThreadStateException ex) {
697 // Continue to drain.
698 }
699
700 inBytesRead += drainBuffer(ir, inputBuffer, maxLength, inBytesRead, processEnded);
701 errBytesRead += drainBuffer(er, errorBuffer, maxLength, errBytesRead, processEnded);
702 }
703 }
704 finally {
705 ir.close();
706 er.close();
707 }
708 return exitValue;
709 }
710
711 /**
712 * Reads the contents of a stream and stores them into the provided buffer.
713 *
714 * @param br The stream to be read.
715 * @param storageBuf The buffer into which the contents of the stream are to be stored.
716 * @param maxLength The maximum number of bytes to be stored in the buffer. An indicative value and may be
717 * exceeded.
718 * @param bytesRead The number of bytes read from this stream to date.
719 * @param readAll If true, the stream is drained while their is data available in it. Otherwise, only a single chunk
720 * of data is read, irrespective of how much is available.
721 * @return
722 * @throws IOException
723 */
724 private int drainBuffer(BufferedReader br, StringBuffer storageBuf, int maxLength, int bytesRead, boolean readAll)
725 throws IOException {
726 int bReadSession = 0;
727 if (br.ready()) {
728 char[] buf = new char[1024];
729 do {
730 int bReadCurrent = br.read(buf, 0, 1024);
731 if (storageBuf != null && bytesRead < maxLength) {
732 storageBuf.append(buf, 0, bReadCurrent);
733 }
734 bReadSession += bReadCurrent;
735 } while (br.ready() && readAll);
736 }
737 return bReadSession;
738 }
739
740 /**
741 * Returns the first line from a StringBuffer, recognized by the new line character \n.
742 *
743 * @param buffer The StringBuffer from which the first line is required.
744 * @return The first line of the buffer.
745 */
746 private String getFirstLine(StringBuffer buffer) {
747 int newLineIndex = 0;
748 newLineIndex = buffer.indexOf("\n");
749 if (newLineIndex == -1) {
750 return buffer.toString();
751 }
752 else {
753 return buffer.substring(0, newLineIndex);
754 }
755 }
756 }