018    package org.apache.oozie.action.ssh;
020    import java.io.BufferedReader;
021    import java.io.File;
022    import java.io.FileWriter;
023    import java.io.IOException;
024    import java.io.InputStreamReader;
025    import java.util.Arrays;
026    import java.util.List;
027    import java.util.concurrent.Callable;
028    import org.apache.hadoop.util.StringUtils;
030    import org.apache.oozie.client.WorkflowAction;
031    import org.apache.oozie.client.OozieClient;
032    import org.apache.oozie.client.WorkflowAction.Status;
033    import org.apache.oozie.action.ActionExecutor;
034    import org.apache.oozie.action.ActionExecutorException;
035    import org.apache.oozie.service.CallbackService;
036    import org.apache.oozie.servlet.CallbackServlet;
037    import org.apache.oozie.service.Services;
038    import org.apache.oozie.util.IOUtils;
039    import org.apache.oozie.util.PropertiesUtils;
040    import org.apache.oozie.util.XLog;
041    import org.apache.oozie.util.XmlUtils;
042    import org.jdom.Element;
043    import org.jdom.JDOMException;
044    import org.jdom.Namespace;
046    /**
047     * Ssh action executor. <p/> <ul> <li>Execute the shell commands on the remote host</li> <li>Copies the base and wrapper
048     * scripts on to the remote location</li> <li>Base script is used to run the command on the remote host</li> <li>Wrapper
049     * script is used to check the status of the submitted command</li> <li>handles the submission failures</li> </ul>
050     */
051    public class SshActionExecutor extends ActionExecutor {
052        public static final String ACTION_TYPE = "ssh";
054        /**
055         * Configuration parameter which specifies whether the specified ssh user is allowed, or has to be the job user.
056         */
057        public static final String CONF_SSH_ALLOW_USER_AT_HOST = CONF_PREFIX + "ssh.allow.user.at.host";
059        protected static final String SSH_COMMAND_OPTIONS =
060                "-o PasswordAuthentication=no -o KbdInteractiveDevices=no -o StrictHostKeyChecking=no -o ConnectTimeout=20 ";
062        protected static final String SSH_COMMAND_BASE = "ssh " + SSH_COMMAND_OPTIONS;
063        protected static final String SCP_COMMAND_BASE = "scp " + SSH_COMMAND_OPTIONS;
065        public static final String ERR_SETUP_FAILED = "SETUP_FAILED";
066        public static final String ERR_EXECUTION_FAILED = "EXECUTION_FAILED";
067        public static final String ERR_UNKNOWN_ERROR = "UNKOWN_ERROR";
068        public static final String ERR_COULD_NOT_CONNECT = "COULD_NOT_CONNECT";
069        public static final String ERR_HOST_RESOLUTION = "COULD_NOT_RESOLVE_HOST";
070        public static final String ERR_FNF = "FNF";
071        public static final String ERR_AUTH_FAILED = "AUTH_FAILED";
072        public static final String ERR_NO_EXEC_PERM = "NO_EXEC_PERM";
073        public static final String ERR_USER_MISMATCH = "ERR_USER_MISMATCH";
074        public static final String ERR_EXCEDE_LEN = "ERR_OUTPUT_EXCEED_MAX_LEN";
076        public static final String DELETE_TMP_DIR = "oozie.action.ssh.delete.remote.tmp.dir";
078        public static final String HTTP_COMMAND = "oozie.action.ssh.http.command";
080        public static final String HTTP_COMMAND_OPTIONS = "oozie.action.ssh.http.command.post.options";
082        private static final String EXT_STATUS_VAR = "#status";
084        private static int maxLen;
085        private static boolean allowSshUserAtHost;
087        protected SshActionExecutor() {
088            super(ACTION_TYPE);
089        }
091        /**
092         * Initialize Action.
093         */
094        @Override
095        public void initActionType() {
096            super.initActionType();
097            maxLen = getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN, 2 * 1024);
098            allowSshUserAtHost = getOozieConf().getBoolean(CONF_SSH_ALLOW_USER_AT_HOST, true);
099            registerError(InterruptedException.class.getName(), ActionExecutorException.ErrorType.ERROR, "SH001");
100            registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "SH002");
101            initSshScripts();
102        }
104        /**
105         * Check ssh action status.
106         *
107         * @param context action execution context.
108         * @param action action object.
109         */
110        @Override
111        public void check(Context context, WorkflowAction action) throws ActionExecutorException {
112            Status status = getActionStatus(context, action);
113            boolean captureOutput = false;
114            try {
115                Element eConf = XmlUtils.parseXml(action.getConf());
116                Namespace ns = eConf.getNamespace();
117                captureOutput = eConf.getChild("capture-output", ns) != null;
118            }
119            catch (JDOMException ex) {
120                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_XML_PARSE_FAILED",
121                                                  "unknown error", ex);
122            }
123            XLog log = XLog.getLog(getClass());
124            log.debug("Capture Output: {0}", captureOutput);
125            if (status == Status.OK) {
126                if (captureOutput) {
127                    String outFile = getRemoteFileName(context, action, "stdout", false, true);
128                    String dataCommand = SSH_COMMAND_BASE + action.getTrackerUri() + " cat " + outFile;
129                    log.debug("Ssh command [{0}]", dataCommand);
130                    try {
131                        Process process = Runtime.getRuntime().exec(dataCommand.split("\\s"));
132                        StringBuffer buffer = new StringBuffer();
133                        boolean overflow = false;
134                        drainBuffers(process, buffer, null, maxLen);
135                        if (buffer.length() > maxLen) {
136                            overflow = true;
137                        }
138                        if (overflow) {
139                            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR,
140                                                              "ERR_OUTPUT_EXCEED_MAX_LEN", "unknown error");
141                        }
142                        context.setExecutionData(status.toString(), PropertiesUtils.stringToProperties(buffer.toString()));
143                    }
144                    catch (Exception ex) {
145                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_UNKNOWN_ERROR",
146                                                          "unknown error", ex);
147                    }
148                }
149                else {
150                    context.setExecutionData(status.toString(), null);
151                }
152            }
153            else {
154                if (status == Status.ERROR) {
155                    context.setExecutionData(status.toString(), null);
156                }
157                else {
158                    context.setExternalStatus(status.toString());
159                }
160            }
161        }
163        /**
164         * Kill ssh action.
165         *
166         * @param context action execution context.
167         * @param action object.
168         */
169        @Override
170        public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
171            String command = "ssh " + action.getTrackerUri() + " kill  -KILL " + action.getExternalId();
172            int returnValue = getReturnValue(command);
173            if (returnValue != 0) {
174                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_TO_KILL", XLog.format(
175                        "Unable to kill process {0} on {1}", action.getExternalId(), action.getTrackerUri()));
176            }
177            context.setEndData(WorkflowAction.Status.KILLED, "ERROR");
178        }
180        /**
181         * Start the ssh action execution.
182         *
183         * @param context action execution context.
184         * @param action action object.
185         */
186        @SuppressWarnings("unchecked")
187        @Override
188        public void start(final Context context, final WorkflowAction action) throws ActionExecutorException {
189            XLog log = XLog.getLog(getClass());
190            log.info("start() begins");
191            String confStr = action.getConf();
192            Element conf;
193            try {
194                conf = XmlUtils.parseXml(confStr);
195            }
196            catch (Exception ex) {
197                throw convertException(ex);
198            }
199            Namespace nameSpace = conf.getNamespace();
200            Element hostElement = conf.getChild("host", nameSpace);
201            String hostString = hostElement.getValue().trim();
202            hostString = prepareUserHost(hostString, context);
203            final String host = hostString;
204            final String dirLocation = execute(new Callable<String>() {
205                public String call() throws Exception {
206                    return setupRemote(host, context, action);
207                }
209            });
211            String runningPid = execute(new Callable<String>() {
212                public String call() throws Exception {
213                    return checkIfRunning(host, context, action);
214                }
215            });
216            String pid = "";
218            if (runningPid == null) {
219                final Element commandElement = conf.getChild("command", nameSpace);
220                final boolean ignoreOutput = conf.getChild("capture-output", nameSpace) == null;
222                boolean preserve = false;
223                if (commandElement != null) {
224                    String[] args = null;
225                    // Will either have <args>, <arg>, or neither (but not both)
226                    List<Element> argsList = conf.getChildren("args", nameSpace);
227                    // Arguments in an <args> are "flattened" (spaces are delimiters)
228                    if (argsList != null && argsList.size() > 0) {
229                        StringBuilder argsString = new StringBuilder("");
230                        for (Element argsElement : argsList) {
231                            argsString = argsString.append(argsElement.getValue()).append(" ");
232                        }
233                        args = new String[]{argsString.toString()};
234                    }
235                    else {
236                        // Arguments in an <arg> are preserved, even with spaces
237                        argsList = conf.getChildren("arg", nameSpace);
238                        if (argsList != null && argsList.size() > 0) {
239                            preserve = true;
240                            args = new String[argsList.size()];
241                            for (int i = 0; i < argsList.size(); i++) {
242                                Element argsElement = argsList.get(i);
243                                args[i] = argsElement.getValue();
244                                // Even though we're keeping the args as an array, if they contain a space we still have to either quote
245                                // them or escape their space (because the scripts will split them up otherwise)
246                                if (args[i].contains(" ") &&
247                                        !(args[i].startsWith("\"") && args[i].endsWith("\"") ||
248                                          args[i].startsWith("'") && args[i].endsWith("'"))) {
249                                    args[i] = StringUtils.escapeString(args[i], '\\', ' ');
250                                }
251                            }
252                        }
253                    }
254                    final String[] argsF = args;
255                    final String recoveryId = context.getRecoveryId();
256                    final boolean preserveF = preserve;
257                    pid = execute(new Callable<String>() {
259                        @Override
260                        public String call() throws Exception {
261                            return doExecute(host, dirLocation, commandElement.getValue(), argsF, ignoreOutput, action, recoveryId,
262                                    preserveF);
263                        }
265                    });
266                }
267                context.setStartData(pid, host, host);
268            }
269            else {
270                pid = runningPid;
271                context.setStartData(pid, host, host);
272                check(context, action);
273            }
274            log.info("start() ends");
275        }
277        private String checkIfRunning(String host, final Context context, final WorkflowAction action) {
278            String pid = null;
279            String outFile = getRemoteFileName(context, action, "pid", false, false);
280            String getOutputCmd = SSH_COMMAND_BASE + host + " cat " + outFile;
281            try {
282                Process process = Runtime.getRuntime().exec(getOutputCmd.split("\\s"));
283                StringBuffer buffer = new StringBuffer();
284                drainBuffers(process, buffer, null, maxLen);
285                pid = getFirstLine(buffer);
287                if (Long.valueOf(pid) > 0) {
288                    return pid;
289                }
290                else {
291                    return null;
292                }
293            }
294            catch (Exception e) {
295                return null;
296            }
297        }
299        /**
300         * Get remote host working location.
301         *
302         * @param context action execution context
303         * @param action Action
304         * @param fileExtension Extension to be added to file name
305         * @param dirOnly Get the Directory only
306         * @param useExtId Flag to use external ID in the path
307         * @return remote host file name/Directory.
308         */
309        public String getRemoteFileName(Context context, WorkflowAction action, String fileExtension, boolean dirOnly,
310                                        boolean useExtId) {
311            String path = getActionDirPath(context.getWorkflow().getId(), action, ACTION_TYPE, false) + "/";
312            if (dirOnly) {
313                return path;
314            }
315            if (useExtId) {
316                path = path + action.getExternalId() + ".";
317            }
318            path = path + context.getRecoveryId() + "." + fileExtension;
319            return path;
320        }
322        /**
323         * Utility method to execute command.
324         *
325         * @param command Command to execute as String.
326         * @return exit status of the execution.
327         * @throws IOException if process exits with status nonzero.
328         * @throws InterruptedException if process does not run properly.
329         */
330        public int executeCommand(String command) throws IOException, InterruptedException {
331            Runtime runtime = Runtime.getRuntime();
332            Process p = runtime.exec(command.split("\\s"));
334            StringBuffer errorBuffer = new StringBuffer();
335            int exitValue = drainBuffers(p, null, errorBuffer, maxLen);
337            String error = null;
338            if (exitValue != 0) {
339                error = getTruncatedString(errorBuffer);
340                throw new IOException(XLog.format("Not able to perform operation [{0}]", command) + " | " + "ErrorStream: "
341                        + error);
342            }
343            return exitValue;
344        }
346        /**
347         * Do ssh action execution setup on remote host.
348         *
349         * @param host host name.
350         * @param context action execution context.
351         * @param action action object.
352         * @return remote host working directory.
353         * @throws IOException thrown if failed to setup.
354         * @throws InterruptedException thrown if any interruption happens.
355         */
356        protected String setupRemote(String host, Context context, WorkflowAction action) throws IOException, InterruptedException {
357            XLog log = XLog.getLog(getClass());
358            log.info("Attempting to copy ssh base scripts to remote host [{0}]", host);
359            String localDirLocation = Services.get().getRuntimeDir() + "/ssh";
360            if (localDirLocation.endsWith("/")) {
361                localDirLocation = localDirLocation.substring(0, localDirLocation.length() - 1);
362            }
363            File file = new File(localDirLocation + "/ssh-base.sh");
364            if (!file.exists()) {
365                throw new IOException("Required Local file " + file.getAbsolutePath() + " not present.");
366            }
367            file = new File(localDirLocation + "/ssh-wrapper.sh");
368            if (!file.exists()) {
369                throw new IOException("Required Local file " + file.getAbsolutePath() + " not present.");
370            }
371            String remoteDirLocation = getRemoteFileName(context, action, null, true, true);
372            String command = XLog.format("{0}{1}  mkdir -p {2} ", SSH_COMMAND_BASE, host, remoteDirLocation).toString();
373            executeCommand(command);
374            command = XLog.format("{0}{1}/ssh-base.sh {2}/ssh-wrapper.sh {3}:{4}", SCP_COMMAND_BASE, localDirLocation,
375                                  localDirLocation, host, remoteDirLocation);
376            executeCommand(command);
377            command = XLog.format("{0}{1}  chmod +x {2}ssh-base.sh {3}ssh-wrapper.sh ", SSH_COMMAND_BASE, host,
378                                  remoteDirLocation, remoteDirLocation);
379            executeCommand(command);
380            return remoteDirLocation;
381        }
383        /**
384         * Execute the ssh command.
385         *
386         * @param host hostname.
387         * @param dirLocation location of the base and wrapper scripts.
388         * @param cmnd command to be executed.
389         * @param args command arguments.
390         * @param ignoreOutput ignore output option.
391         * @param action action object.
392         * @param recoveryId action id + run number to enable recovery in rerun
393         * @param preserveArgs tell the ssh scripts to preserve or flatten the arguments
394         * @return process id of the running command.
395         * @throws IOException thrown if failed to run the command.
396         * @throws InterruptedException thrown if any interruption happens.
397         */
398        protected String doExecute(String host, String dirLocation, String cmnd, String[] args, boolean ignoreOutput,
399                                   WorkflowAction action, String recoveryId, boolean preserveArgs)
400                                   throws IOException, InterruptedException {
401            XLog log = XLog.getLog(getClass());
402            Runtime runtime = Runtime.getRuntime();
403            String callbackPost = ignoreOutput ? "_" : getOozieConf().get(HTTP_COMMAND_OPTIONS).replace(" ", "%%%");
404            String preserveArgsS = preserveArgs ? "PRESERVE_ARGS" : "FLATTEN_ARGS";
405            // TODO check
406            String callBackUrl = Services.get().get(CallbackService.class)
407                    .createCallBackUrl(action.getId(), EXT_STATUS_VAR);
408            String command = XLog.format("{0}{1} {2}ssh-base.sh {3} {4} \"{5}\" \"{6}\" {7} {8} ", SSH_COMMAND_BASE, host, dirLocation,
409                                          preserveArgsS, getOozieConf().get(HTTP_COMMAND), callBackUrl, callbackPost, recoveryId, cmnd)
410                    .toString();
411            String[] commandArray = command.split("\\s");
412            String[] finalCommand;
413            if (args == null) {
414                finalCommand = commandArray;
415            }
416            else {
417                finalCommand = new String[commandArray.length + args.length];
418                System.arraycopy(commandArray, 0, finalCommand, 0, commandArray.length);
419                System.arraycopy(args, 0, finalCommand, commandArray.length, args.length);
420            }
421            log.trace("Executing ssh command [{0}]", Arrays.toString(finalCommand));
422            Process p = runtime.exec(finalCommand);
423            String pid = "";
425            StringBuffer inputBuffer = new StringBuffer();
426            StringBuffer errorBuffer = new StringBuffer();
427            int exitValue = drainBuffers(p, inputBuffer, errorBuffer, maxLen);
429            pid = getFirstLine(inputBuffer);
431            String error = null;
432            if (exitValue != 0) {
433                error = getTruncatedString(errorBuffer);
434                throw new IOException(XLog.format("Not able to execute ssh-base.sh on {0}", host) + " | " + "ErrorStream: "
435                        + error);
436            }
437            return pid;
438        }
440        /**
441         * End action execution.
442         *
443         * @param context action execution context.
444         * @param action action object.
445         * @throws ActionExecutorException thrown if action end execution fails.
446         */
447        public void end(final Context context, final WorkflowAction action) throws ActionExecutorException {
448            if (action.getExternalStatus().equals("OK")) {
449                context.setEndData(WorkflowAction.Status.OK, WorkflowAction.Status.OK.toString());
450            }
451            else {
452                context.setEndData(WorkflowAction.Status.ERROR, WorkflowAction.Status.ERROR.toString());
453            }
454            boolean deleteTmpDir = getOozieConf().getBoolean(DELETE_TMP_DIR, true);
455            if (deleteTmpDir) {
456                String tmpDir = getRemoteFileName(context, action, null, true, false);
457                String removeTmpDirCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " rm -rf " + tmpDir;
458                int retVal = getReturnValue(removeTmpDirCmd);
459                if (retVal != 0) {
460                    XLog.getLog(getClass()).warn("Cannot delete temp dir {0}", tmpDir);
461                }
462            }
463        }
465        /**
466         * Get the return value of a process.
467         *
468         * @param command command to be executed.
469         * @return zero if execution is successful and any non zero value for failure.
470         * @throws ActionExecutorException
471         */
472        private int getReturnValue(String command) throws ActionExecutorException {
473            int returnValue;
474            Process ps = null;
475            try {
476                ps = Runtime.getRuntime().exec(command.split("\\s"));
477                returnValue = drainBuffers(ps, null, null, 0);
478            }
479            catch (IOException e) {
480                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_OPERATION", XLog.format(
481                        "Not able to perform operation {0}", command), e);
482            }
483            finally {
484                ps.destroy();
485            }
486            return returnValue;
487        }
489        /**
490         * Copy the ssh base and wrapper scripts to the local directory.
491         */
492        private void initSshScripts() {
493            String dirLocation = Services.get().getRuntimeDir() + "/ssh";
494            File path = new File(dirLocation);
495            path.mkdirs();
496            if (!path.exists()) {
497                throw new RuntimeException(XLog.format("Not able to create required directory {0}", dirLocation));
498            }
499            try {
500                IOUtils.copyCharStream(IOUtils.getResourceAsReader("ssh-base.sh", -1), new FileWriter(dirLocation
501                        + "/ssh-base.sh"));
502                IOUtils.copyCharStream(IOUtils.getResourceAsReader("ssh-wrapper.sh", -1), new FileWriter(dirLocation
503                        + "/ssh-wrapper.sh"));
504            }
505            catch (IOException ie) {
506                throw new RuntimeException(XLog.format("Not able to copy required scripts file to {0} "
507                        + "for SshActionHandler", dirLocation));
508            }
509        }
511        /**
512         * Get action status.
513         *
514         * @param action action object.
515         * @return status of the action(RUNNING/OK/ERROR).
516         * @throws ActionExecutorException thrown if there is any error in getting status.
517         */
518        protected Status getActionStatus(Context context, WorkflowAction action) throws ActionExecutorException {
519            String command = SSH_COMMAND_BASE + action.getTrackerUri() + " ps -p " + action.getExternalId();
520            Status aStatus;
521            int returnValue = getReturnValue(command);
522            if (returnValue == 0) {
523                aStatus = Status.RUNNING;
524            }
525            else {
526                String outFile = getRemoteFileName(context, action, "error", false, true);
527                String checkErrorCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " ls " + outFile;
528                int retVal = getReturnValue(checkErrorCmd);
529                if (retVal == 0) {
530                    aStatus = Status.ERROR;
531                }
532                else {
533                    aStatus = Status.OK;
534                }
535            }
536            return aStatus;
537        }
539        /**
540         * Execute the callable.
541         *
542         * @param callable required callable.
543         * @throws ActionExecutorException thrown if there is any error in command execution.
544         */
545        private <T> T execute(Callable<T> callable) throws ActionExecutorException {
546            XLog log = XLog.getLog(getClass());
547            try {
548                return callable.call();
549            }
550            catch (IOException ex) {
551                log.warn("Error while executing ssh EXECUTION");
552                String errorMessage = ex.getMessage();
553                if (null == errorMessage) { // Unknown IOException
554                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_UNKNOWN_ERROR, ex
555                            .getMessage(), ex);
556                } // Host Resolution Issues
557                else {
558                    if (errorMessage.contains("Could not resolve hostname") ||
559                            errorMessage.contains("service not known")) {
560                        throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_HOST_RESOLUTION, ex
561                                .getMessage(), ex);
562                    } // Connection Timeout. Host temporarily down.
563                    else {
564                        if (errorMessage.contains("timed out")) {
565                            throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_COULD_NOT_CONNECT,
566                                                              ex.getMessage(), ex);
567                        }// Local ssh-base or ssh-wrapper missing
568                        else {
569                            if (errorMessage.contains("Required Local file")) {
570                                throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_FNF,
571                                                                  ex.getMessage(), ex); // local_FNF
572                            }// Required oozie bash scripts missing, after the copy was
573                            // successful
574                            else {
575                                if (errorMessage.contains("No such file or directory")
576                                        && (errorMessage.contains("ssh-base") || errorMessage.contains("ssh-wrapper"))) {
577                                    throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_FNF,
578                                                                      ex.getMessage(), ex); // remote
579                                    // FNF
580                                } // Required application execution binary missing (either
581                                // caught by ssh-wrapper
582                                else {
583                                    if (errorMessage.contains("command not found")) {
584                                        throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, ERR_FNF, ex
585                                                .getMessage(), ex); // remote
586                                        // FNF
587                                    } // Permission denied while connecting
588                                    else {
589                                        if (errorMessage.contains("Permission denied")) {
590                                            throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT,
591                                                    ERR_AUTH_FAILED, ex.getMessage(), ex);
592                                        } // Permission denied while executing
593                                        else {
594                                            if (errorMessage.contains(": Permission denied")) {
595                                                throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT,
596                                                        ERR_NO_EXEC_PERM, ex.getMessage(), ex);
597                                            }
598                                            else {
599                                                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR,
600                                                        ERR_UNKNOWN_ERROR, ex.getMessage(), ex);
601                                            }
602                                        }
603                                    }
604                                }
605                            }
606                        }
607                    }
608                }
609            } // Any other type of exception
610            catch (Exception ex) {
611                throw convertException(ex);
612            }
613        }
615        /**
616         * Checks whether the system is configured to always use the oozie user for ssh, and injects the user if required.
617         *
618         * @param host the host string.
619         * @param context the execution context.
620         * @return the modified host string with a user parameter added on if required.
621         * @throws ActionExecutorException in case the flag to use the oozie user is turned on and there is a mismatch
622         * between the user specified in the host and the oozie user.
623         */
624        private String prepareUserHost(String host, Context context) throws ActionExecutorException {
625            String oozieUser = context.getProtoActionConf().get(OozieClient.USER_NAME);
626            if (allowSshUserAtHost) {
627                if (!host.contains("@")) {
628                    host = oozieUser + "@" + host;
629                }
630            }
631            else {
632                if (host.contains("@")) {
633                    if (!host.toLowerCase().startsWith(oozieUser + "@")) {
634                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_USER_MISMATCH,
635                                                          XLog.format("user mismatch between oozie user [{0}] and ssh host [{1}]",
636                                                                  oozieUser, host));
637                    }
638                }
639                else {
640                    host = oozieUser + "@" + host;
641                }
642            }
643            return host;
644        }
646        @Override
647        public boolean isCompleted(String externalStatus) {
648            return true;
649        }
651        /**
652         * Truncate the string to max length.
653         *
654         * @param strBuffer
655         * @return truncated string string
656         */
657        private String getTruncatedString(StringBuffer strBuffer) {
659            if (strBuffer.length() <= maxLen) {
660                return strBuffer.toString();
661            }
662            else {
663                return strBuffer.substring(0, maxLen);
664            }
665        }
667        /**
668         * Drains the inputStream and errorStream of the Process being executed. The contents of the streams are stored if a
669         * buffer is provided for the stream.
670         *
671         * @param p The Process instance.
672         * @param inputBuffer The buffer into which STDOUT is to be read. Can be null if only draining is required.
673         * @param errorBuffer The buffer into which STDERR is to be read. Can be null if only draining is required.
674         * @param maxLength The maximum data length to be stored in these buffers. This is an indicative value, and the
675         * store content may exceed this length.
676         * @return the exit value of the process.
677         * @throws IOException
678         */
679        private int drainBuffers(Process p, StringBuffer inputBuffer, StringBuffer errorBuffer, int maxLength)
680                throws IOException {
681            int exitValue = -1;
682            BufferedReader ir = new BufferedReader(new InputStreamReader(p.getInputStream()));
683            BufferedReader er = new BufferedReader(new InputStreamReader(p.getErrorStream()));
685            int inBytesRead = 0;
686            int errBytesRead = 0;
688            boolean processEnded = false;
690            try {
691                while (!processEnded) {
692                    try {
693                        exitValue = p.exitValue();
694                        processEnded = true;
695                    }
696                    catch (IllegalThreadStateException ex) {
697                        // Continue to drain.
698                    }
700                    inBytesRead += drainBuffer(ir, inputBuffer, maxLength, inBytesRead, processEnded);
701                    errBytesRead += drainBuffer(er, errorBuffer, maxLength, errBytesRead, processEnded);
702                }
703            }
704            finally {
705                ir.close();
706                er.close();
707            }
708            return exitValue;
709        }
711        /**
712         * Reads the contents of a stream and stores them into the provided buffer.
713         *
714         * @param br The stream to be read.
715         * @param storageBuf The buffer into which the contents of the stream are to be stored.
716         * @param maxLength The maximum number of bytes to be stored in the buffer. An indicative value and may be
717         * exceeded.
718         * @param bytesRead The number of bytes read from this stream to date.
719         * @param readAll If true, the stream is drained while their is data available in it. Otherwise, only a single chunk
720         * of data is read, irrespective of how much is available.
721         * @return
722         * @throws IOException
723         */
724        private int drainBuffer(BufferedReader br, StringBuffer storageBuf, int maxLength, int bytesRead, boolean readAll)
725                throws IOException {
726            int bReadSession = 0;
727            if (br.ready()) {
728                char[] buf = new char[1024];
729                do {
730                    int bReadCurrent = br.read(buf, 0, 1024);
731                    if (storageBuf != null && bytesRead < maxLength) {
732                        storageBuf.append(buf, 0, bReadCurrent);
733                    }
734                    bReadSession += bReadCurrent;
735                } while (br.ready() && readAll);
736            }
737            return bReadSession;
738        }
740        /**
741         * Returns the first line from a StringBuffer, recognized by the new line character \n.
742         *
743         * @param buffer The StringBuffer from which the first line is required.
744         * @return The first line of the buffer.
745         */
746        private String getFirstLine(StringBuffer buffer) {
747            int newLineIndex = 0;
748            newLineIndex = buffer.indexOf("\n");
749            if (newLineIndex == -1) {
750                return buffer.toString();
751            }
752            else {
753                return buffer.substring(0, newLineIndex);
754            }
755        }
756    }