001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.action.ssh;
019
020import java.io.BufferedReader;
021import java.io.File;
022import java.io.FileWriter;
023import java.io.IOException;
024import java.io.InputStreamReader;
025import java.util.Arrays;
026import java.util.List;
027import java.util.concurrent.Callable;
028import org.apache.hadoop.util.StringUtils;
029
030import org.apache.oozie.client.WorkflowAction;
031import org.apache.oozie.client.OozieClient;
032import org.apache.oozie.client.WorkflowAction.Status;
033import org.apache.oozie.action.ActionExecutor;
034import org.apache.oozie.action.ActionExecutorException;
035import org.apache.oozie.service.CallbackService;
036import org.apache.oozie.servlet.CallbackServlet;
037import org.apache.oozie.service.Services;
038import org.apache.oozie.util.IOUtils;
039import org.apache.oozie.util.PropertiesUtils;
040import org.apache.oozie.util.XLog;
041import org.apache.oozie.util.XmlUtils;
042import org.jdom.Element;
043import org.jdom.JDOMException;
044import org.jdom.Namespace;
045
046/**
047 * Ssh action executor. <p/> <ul> <li>Execute the shell commands on the remote host</li> <li>Copies the base and wrapper
048 * scripts on to the remote location</li> <li>Base script is used to run the command on the remote host</li> <li>Wrapper
049 * script is used to check the status of the submitted command</li> <li>handles the submission failures</li> </ul>
050 */
051public class SshActionExecutor extends ActionExecutor {
052    public static final String ACTION_TYPE = "ssh";
053
054    /**
055     * Configuration parameter which specifies whether the specified ssh user is allowed, or has to be the job user.
056     */
057    public static final String CONF_SSH_ALLOW_USER_AT_HOST = CONF_PREFIX + "ssh.allow.user.at.host";
058
059    protected static final String SSH_COMMAND_OPTIONS =
060            "-o PasswordAuthentication=no -o KbdInteractiveDevices=no -o StrictHostKeyChecking=no -o ConnectTimeout=20 ";
061
062    protected static final String SSH_COMMAND_BASE = "ssh " + SSH_COMMAND_OPTIONS;
063    protected static final String SCP_COMMAND_BASE = "scp " + SSH_COMMAND_OPTIONS;
064
065    public static final String ERR_SETUP_FAILED = "SETUP_FAILED";
066    public static final String ERR_EXECUTION_FAILED = "EXECUTION_FAILED";
067    public static final String ERR_UNKNOWN_ERROR = "UNKOWN_ERROR";
068    public static final String ERR_COULD_NOT_CONNECT = "COULD_NOT_CONNECT";
069    public static final String ERR_HOST_RESOLUTION = "COULD_NOT_RESOLVE_HOST";
070    public static final String ERR_FNF = "FNF";
071    public static final String ERR_AUTH_FAILED = "AUTH_FAILED";
072    public static final String ERR_NO_EXEC_PERM = "NO_EXEC_PERM";
073    public static final String ERR_USER_MISMATCH = "ERR_USER_MISMATCH";
074    public static final String ERR_EXCEDE_LEN = "ERR_OUTPUT_EXCEED_MAX_LEN";
075
076    public static final String DELETE_TMP_DIR = "oozie.action.ssh.delete.remote.tmp.dir";
077
078    public static final String HTTP_COMMAND = "oozie.action.ssh.http.command";
079
080    public static final String HTTP_COMMAND_OPTIONS = "oozie.action.ssh.http.command.post.options";
081
082    private static final String EXT_STATUS_VAR = "#status";
083
084    private static int maxLen;
085    private static boolean allowSshUserAtHost;
086
087    protected SshActionExecutor() {
088        super(ACTION_TYPE);
089    }
090
091    /**
092     * Initialize Action.
093     */
094    @Override
095    public void initActionType() {
096        super.initActionType();
097        maxLen = getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN, 2 * 1024);
098        allowSshUserAtHost = getOozieConf().getBoolean(CONF_SSH_ALLOW_USER_AT_HOST, true);
099        registerError(InterruptedException.class.getName(), ActionExecutorException.ErrorType.ERROR, "SH001");
100        registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "SH002");
101        initSshScripts();
102    }
103
104    /**
105     * Check ssh action status.
106     *
107     * @param context action execution context.
108     * @param action action object.
109     */
110    @Override
111    public void check(Context context, WorkflowAction action) throws ActionExecutorException {
112        Status status = getActionStatus(context, action);
113        boolean captureOutput = false;
114        try {
115            Element eConf = XmlUtils.parseXml(action.getConf());
116            Namespace ns = eConf.getNamespace();
117            captureOutput = eConf.getChild("capture-output", ns) != null;
118        }
119        catch (JDOMException ex) {
120            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_XML_PARSE_FAILED",
121                                              "unknown error", ex);
122        }
123        XLog log = XLog.getLog(getClass());
124        log.debug("Capture Output: {0}", captureOutput);
125        if (status == Status.OK) {
126            if (captureOutput) {
127                String outFile = getRemoteFileName(context, action, "stdout", false, true);
128                String dataCommand = SSH_COMMAND_BASE + action.getTrackerUri() + " cat " + outFile;
129                log.debug("Ssh command [{0}]", dataCommand);
130                try {
131                    Process process = Runtime.getRuntime().exec(dataCommand.split("\\s"));
132                    StringBuffer buffer = new StringBuffer();
133                    boolean overflow = false;
134                    drainBuffers(process, buffer, null, maxLen);
135                    if (buffer.length() > maxLen) {
136                        overflow = true;
137                    }
138                    if (overflow) {
139                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR,
140                                                          "ERR_OUTPUT_EXCEED_MAX_LEN", "unknown error");
141                    }
142                    context.setExecutionData(status.toString(), PropertiesUtils.stringToProperties(buffer.toString()));
143                }
144                catch (Exception ex) {
145                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_UNKNOWN_ERROR",
146                                                      "unknown error", ex);
147                }
148            }
149            else {
150                context.setExecutionData(status.toString(), null);
151            }
152        }
153        else {
154            if (status == Status.ERROR) {
155                context.setExecutionData(status.toString(), null);
156            }
157            else {
158                context.setExternalStatus(status.toString());
159            }
160        }
161    }
162
163    /**
164     * Kill ssh action.
165     *
166     * @param context action execution context.
167     * @param action object.
168     */
169    @Override
170    public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
171        String command = "ssh " + action.getTrackerUri() + " kill  -KILL " + action.getExternalId();
172        int returnValue = getReturnValue(command);
173        if (returnValue != 0) {
174            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_TO_KILL", XLog.format(
175                    "Unable to kill process {0} on {1}", action.getExternalId(), action.getTrackerUri()));
176        }
177        context.setEndData(WorkflowAction.Status.KILLED, "ERROR");
178    }
179
180    /**
181     * Start the ssh action execution.
182     *
183     * @param context action execution context.
184     * @param action action object.
185     */
186    @SuppressWarnings("unchecked")
187    @Override
188    public void start(final Context context, final WorkflowAction action) throws ActionExecutorException {
189        XLog log = XLog.getLog(getClass());
190        log.info("start() begins");
191        String confStr = action.getConf();
192        Element conf;
193        try {
194            conf = XmlUtils.parseXml(confStr);
195        }
196        catch (Exception ex) {
197            throw convertException(ex);
198        }
199        Namespace nameSpace = conf.getNamespace();
200        Element hostElement = conf.getChild("host", nameSpace);
201        String hostString = hostElement.getValue().trim();
202        hostString = prepareUserHost(hostString, context);
203        final String host = hostString;
204        final String dirLocation = execute(new Callable<String>() {
205            public String call() throws Exception {
206                return setupRemote(host, context, action);
207            }
208
209        });
210
211        String runningPid = execute(new Callable<String>() {
212            public String call() throws Exception {
213                return checkIfRunning(host, context, action);
214            }
215        });
216        String pid = "";
217
218        if (runningPid == null) {
219            final Element commandElement = conf.getChild("command", nameSpace);
220            final boolean ignoreOutput = conf.getChild("capture-output", nameSpace) == null;
221
222            boolean preserve = false;
223            if (commandElement != null) {
224                String[] args = null;
225                // Will either have <args>, <arg>, or neither (but not both)
226                List<Element> argsList = conf.getChildren("args", nameSpace);
227                // Arguments in an <args> are "flattened" (spaces are delimiters)
228                if (argsList != null && argsList.size() > 0) {
229                    StringBuilder argsString = new StringBuilder("");
230                    for (Element argsElement : argsList) {
231                        argsString = argsString.append(argsElement.getValue()).append(" ");
232                    }
233                    args = new String[]{argsString.toString()};
234                }
235                else {
236                    // Arguments in an <arg> are preserved, even with spaces
237                    argsList = conf.getChildren("arg", nameSpace);
238                    if (argsList != null && argsList.size() > 0) {
239                        preserve = true;
240                        args = new String[argsList.size()];
241                        for (int i = 0; i < argsList.size(); i++) {
242                            Element argsElement = argsList.get(i);
243                            args[i] = argsElement.getValue();
244                            // Even though we're keeping the args as an array, if they contain a space we still have to either quote
245                            // them or escape their space (because the scripts will split them up otherwise)
246                            if (args[i].contains(" ") &&
247                                    !(args[i].startsWith("\"") && args[i].endsWith("\"") ||
248                                      args[i].startsWith("'") && args[i].endsWith("'"))) {
249                                args[i] = StringUtils.escapeString(args[i], '\\', ' ');
250                            }
251                        }
252                    }
253                }
254                final String[] argsF = args;
255                final String recoveryId = context.getRecoveryId();
256                final boolean preserveF = preserve;
257                pid = execute(new Callable<String>() {
258
259                    @Override
260                    public String call() throws Exception {
261                        return doExecute(host, dirLocation, commandElement.getValue(), argsF, ignoreOutput, action, recoveryId,
262                                preserveF);
263                    }
264
265                });
266            }
267            context.setStartData(pid, host, host);
268        }
269        else {
270            pid = runningPid;
271            context.setStartData(pid, host, host);
272            check(context, action);
273        }
274        log.info("start() ends");
275    }
276
277    private String checkIfRunning(String host, final Context context, final WorkflowAction action) {
278        String pid = null;
279        String outFile = getRemoteFileName(context, action, "pid", false, false);
280        String getOutputCmd = SSH_COMMAND_BASE + host + " cat " + outFile;
281        try {
282            Process process = Runtime.getRuntime().exec(getOutputCmd.split("\\s"));
283            StringBuffer buffer = new StringBuffer();
284            drainBuffers(process, buffer, null, maxLen);
285            pid = getFirstLine(buffer);
286
287            if (Long.valueOf(pid) > 0) {
288                return pid;
289            }
290            else {
291                return null;
292            }
293        }
294        catch (Exception e) {
295            return null;
296        }
297    }
298
299    /**
300     * Get remote host working location.
301     *
302     * @param context action execution context
303     * @param action Action
304     * @param fileExtension Extension to be added to file name
305     * @param dirOnly Get the Directory only
306     * @param useExtId Flag to use external ID in the path
307     * @return remote host file name/Directory.
308     */
309    public String getRemoteFileName(Context context, WorkflowAction action, String fileExtension, boolean dirOnly,
310                                    boolean useExtId) {
311        String path = getActionDirPath(context.getWorkflow().getId(), action, ACTION_TYPE, false) + "/";
312        if (dirOnly) {
313            return path;
314        }
315        if (useExtId) {
316            path = path + action.getExternalId() + ".";
317        }
318        path = path + context.getRecoveryId() + "." + fileExtension;
319        return path;
320    }
321
322    /**
323     * Utility method to execute command.
324     *
325     * @param command Command to execute as String.
326     * @return exit status of the execution.
327     * @throws IOException if process exits with status nonzero.
328     * @throws InterruptedException if process does not run properly.
329     */
330    public int executeCommand(String command) throws IOException, InterruptedException {
331        Runtime runtime = Runtime.getRuntime();
332        Process p = runtime.exec(command.split("\\s"));
333
334        StringBuffer errorBuffer = new StringBuffer();
335        int exitValue = drainBuffers(p, null, errorBuffer, maxLen);
336
337        String error = null;
338        if (exitValue != 0) {
339            error = getTruncatedString(errorBuffer);
340            throw new IOException(XLog.format("Not able to perform operation [{0}]", command) + " | " + "ErrorStream: "
341                    + error);
342        }
343        return exitValue;
344    }
345
346    /**
347     * Do ssh action execution setup on remote host.
348     *
349     * @param host host name.
350     * @param context action execution context.
351     * @param action action object.
352     * @return remote host working directory.
353     * @throws IOException thrown if failed to setup.
354     * @throws InterruptedException thrown if any interruption happens.
355     */
356    protected String setupRemote(String host, Context context, WorkflowAction action) throws IOException, InterruptedException {
357        XLog log = XLog.getLog(getClass());
358        log.info("Attempting to copy ssh base scripts to remote host [{0}]", host);
359        String localDirLocation = Services.get().getRuntimeDir() + "/ssh";
360        if (localDirLocation.endsWith("/")) {
361            localDirLocation = localDirLocation.substring(0, localDirLocation.length() - 1);
362        }
363        File file = new File(localDirLocation + "/ssh-base.sh");
364        if (!file.exists()) {
365            throw new IOException("Required Local file " + file.getAbsolutePath() + " not present.");
366        }
367        file = new File(localDirLocation + "/ssh-wrapper.sh");
368        if (!file.exists()) {
369            throw new IOException("Required Local file " + file.getAbsolutePath() + " not present.");
370        }
371        String remoteDirLocation = getRemoteFileName(context, action, null, true, true);
372        String command = XLog.format("{0}{1}  mkdir -p {2} ", SSH_COMMAND_BASE, host, remoteDirLocation).toString();
373        executeCommand(command);
374        command = XLog.format("{0}{1}/ssh-base.sh {2}/ssh-wrapper.sh {3}:{4}", SCP_COMMAND_BASE, localDirLocation,
375                              localDirLocation, host, remoteDirLocation);
376        executeCommand(command);
377        command = XLog.format("{0}{1}  chmod +x {2}ssh-base.sh {3}ssh-wrapper.sh ", SSH_COMMAND_BASE, host,
378                              remoteDirLocation, remoteDirLocation);
379        executeCommand(command);
380        return remoteDirLocation;
381    }
382
383    /**
384     * Execute the ssh command.
385     *
386     * @param host hostname.
387     * @param dirLocation location of the base and wrapper scripts.
388     * @param cmnd command to be executed.
389     * @param args command arguments.
390     * @param ignoreOutput ignore output option.
391     * @param action action object.
392     * @param recoveryId action id + run number to enable recovery in rerun
393     * @param preserveArgs tell the ssh scripts to preserve or flatten the arguments
394     * @return process id of the running command.
395     * @throws IOException thrown if failed to run the command.
396     * @throws InterruptedException thrown if any interruption happens.
397     */
398    protected String doExecute(String host, String dirLocation, String cmnd, String[] args, boolean ignoreOutput,
399                               WorkflowAction action, String recoveryId, boolean preserveArgs)
400                               throws IOException, InterruptedException {
401        XLog log = XLog.getLog(getClass());
402        Runtime runtime = Runtime.getRuntime();
403        String callbackPost = ignoreOutput ? "_" : getOozieConf().get(HTTP_COMMAND_OPTIONS).replace(" ", "%%%");
404        String preserveArgsS = preserveArgs ? "PRESERVE_ARGS" : "FLATTEN_ARGS";
405        // TODO check
406        String callBackUrl = Services.get().get(CallbackService.class)
407                .createCallBackUrl(action.getId(), EXT_STATUS_VAR);
408        String command = XLog.format("{0}{1} {2}ssh-base.sh {3} {4} \"{5}\" \"{6}\" {7} {8} ", SSH_COMMAND_BASE, host, dirLocation,
409                                      preserveArgsS, getOozieConf().get(HTTP_COMMAND), callBackUrl, callbackPost, recoveryId, cmnd)
410                .toString();
411        String[] commandArray = command.split("\\s");
412        String[] finalCommand;
413        if (args == null) {
414            finalCommand = commandArray;
415        }
416        else {
417            finalCommand = new String[commandArray.length + args.length];
418            System.arraycopy(commandArray, 0, finalCommand, 0, commandArray.length);
419            System.arraycopy(args, 0, finalCommand, commandArray.length, args.length);
420        }
421        log.trace("Executing ssh command [{0}]", Arrays.toString(finalCommand));
422        Process p = runtime.exec(finalCommand);
423        String pid = "";
424
425        StringBuffer inputBuffer = new StringBuffer();
426        StringBuffer errorBuffer = new StringBuffer();
427        int exitValue = drainBuffers(p, inputBuffer, errorBuffer, maxLen);
428
429        pid = getFirstLine(inputBuffer);
430
431        String error = null;
432        if (exitValue != 0) {
433            error = getTruncatedString(errorBuffer);
434            throw new IOException(XLog.format("Not able to execute ssh-base.sh on {0}", host) + " | " + "ErrorStream: "
435                    + error);
436        }
437        return pid;
438    }
439
440    /**
441     * End action execution.
442     *
443     * @param context action execution context.
444     * @param action action object.
445     * @throws ActionExecutorException thrown if action end execution fails.
446     */
447    public void end(final Context context, final WorkflowAction action) throws ActionExecutorException {
448        if (action.getExternalStatus().equals("OK")) {
449            context.setEndData(WorkflowAction.Status.OK, WorkflowAction.Status.OK.toString());
450        }
451        else {
452            context.setEndData(WorkflowAction.Status.ERROR, WorkflowAction.Status.ERROR.toString());
453        }
454        boolean deleteTmpDir = getOozieConf().getBoolean(DELETE_TMP_DIR, true);
455        if (deleteTmpDir) {
456            String tmpDir = getRemoteFileName(context, action, null, true, false);
457            String removeTmpDirCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " rm -rf " + tmpDir;
458            int retVal = getReturnValue(removeTmpDirCmd);
459            if (retVal != 0) {
460                XLog.getLog(getClass()).warn("Cannot delete temp dir {0}", tmpDir);
461            }
462        }
463    }
464
465    /**
466     * Get the return value of a process.
467     *
468     * @param command command to be executed.
469     * @return zero if execution is successful and any non zero value for failure.
470     * @throws ActionExecutorException
471     */
472    private int getReturnValue(String command) throws ActionExecutorException {
473        int returnValue;
474        Process ps = null;
475        try {
476            ps = Runtime.getRuntime().exec(command.split("\\s"));
477            returnValue = drainBuffers(ps, null, null, 0);
478        }
479        catch (IOException e) {
480            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_OPERATION", XLog.format(
481                    "Not able to perform operation {0}", command), e);
482        }
483        finally {
484            ps.destroy();
485        }
486        return returnValue;
487    }
488
489    /**
490     * Copy the ssh base and wrapper scripts to the local directory.
491     */
492    private void initSshScripts() {
493        String dirLocation = Services.get().getRuntimeDir() + "/ssh";
494        File path = new File(dirLocation);
495        path.mkdirs();
496        if (!path.exists()) {
497            throw new RuntimeException(XLog.format("Not able to create required directory {0}", dirLocation));
498        }
499        try {
500            IOUtils.copyCharStream(IOUtils.getResourceAsReader("ssh-base.sh", -1), new FileWriter(dirLocation
501                    + "/ssh-base.sh"));
502            IOUtils.copyCharStream(IOUtils.getResourceAsReader("ssh-wrapper.sh", -1), new FileWriter(dirLocation
503                    + "/ssh-wrapper.sh"));
504        }
505        catch (IOException ie) {
506            throw new RuntimeException(XLog.format("Not able to copy required scripts file to {0} "
507                    + "for SshActionHandler", dirLocation));
508        }
509    }
510
511    /**
512     * Get action status.
513     *
514     * @param action action object.
515     * @return status of the action(RUNNING/OK/ERROR).
516     * @throws ActionExecutorException thrown if there is any error in getting status.
517     */
518    protected Status getActionStatus(Context context, WorkflowAction action) throws ActionExecutorException {
519        String command = SSH_COMMAND_BASE + action.getTrackerUri() + " ps -p " + action.getExternalId();
520        Status aStatus;
521        int returnValue = getReturnValue(command);
522        if (returnValue == 0) {
523            aStatus = Status.RUNNING;
524        }
525        else {
526            String outFile = getRemoteFileName(context, action, "error", false, true);
527            String checkErrorCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " ls " + outFile;
528            int retVal = getReturnValue(checkErrorCmd);
529            if (retVal == 0) {
530                aStatus = Status.ERROR;
531            }
532            else {
533                aStatus = Status.OK;
534            }
535        }
536        return aStatus;
537    }
538
539    /**
540     * Execute the callable.
541     *
542     * @param callable required callable.
543     * @throws ActionExecutorException thrown if there is any error in command execution.
544     */
545    private <T> T execute(Callable<T> callable) throws ActionExecutorException {
546        XLog log = XLog.getLog(getClass());
547        try {
548            return callable.call();
549        }
550        catch (IOException ex) {
551            log.warn("Error while executing ssh EXECUTION");
552            String errorMessage = ex.getMessage();
553            if (null == errorMessage) { // Unknown IOException
554                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_UNKNOWN_ERROR, ex
555                        .getMessage(), ex);
556            } // Host Resolution Issues
557            else {
558                if (errorMessage.contains("Could not resolve hostname") ||
559                        errorMessage.contains("service not known")) {
560                    throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_HOST_RESOLUTION, ex
561                            .getMessage(), ex);
562                } // Connection Timeout. Host temporarily down.
563                else {
564                    if (errorMessage.contains("timed out")) {
565                        throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_COULD_NOT_CONNECT,
566                                                          ex.getMessage(), ex);
567                    }// Local ssh-base or ssh-wrapper missing
568                    else {
569                        if (errorMessage.contains("Required Local file")) {
570                            throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_FNF,
571                                                              ex.getMessage(), ex); // local_FNF
572                        }// Required oozie bash scripts missing, after the copy was
573                        // successful
574                        else {
575                            if (errorMessage.contains("No such file or directory")
576                                    && (errorMessage.contains("ssh-base") || errorMessage.contains("ssh-wrapper"))) {
577                                throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_FNF,
578                                                                  ex.getMessage(), ex); // remote
579                                // FNF
580                            } // Required application execution binary missing (either
581                            // caught by ssh-wrapper
582                            else {
583                                if (errorMessage.contains("command not found")) {
584                                    throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, ERR_FNF, ex
585                                            .getMessage(), ex); // remote
586                                    // FNF
587                                } // Permission denied while connecting
588                                else {
589                                    if (errorMessage.contains("Permission denied")) {
590                                        throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT,
591                                                ERR_AUTH_FAILED, ex.getMessage(), ex);
592                                    } // Permission denied while executing
593                                    else {
594                                        if (errorMessage.contains(": Permission denied")) {
595                                            throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT,
596                                                    ERR_NO_EXEC_PERM, ex.getMessage(), ex);
597                                        }
598                                        else {
599                                            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR,
600                                                    ERR_UNKNOWN_ERROR, ex.getMessage(), ex);
601                                        }
602                                    }
603                                }
604                            }
605                        }
606                    }
607                }
608            }
609        } // Any other type of exception
610        catch (Exception ex) {
611            throw convertException(ex);
612        }
613    }
614
615    /**
616     * Checks whether the system is configured to always use the oozie user for ssh, and injects the user if required.
617     *
618     * @param host the host string.
619     * @param context the execution context.
620     * @return the modified host string with a user parameter added on if required.
621     * @throws ActionExecutorException in case the flag to use the oozie user is turned on and there is a mismatch
622     * between the user specified in the host and the oozie user.
623     */
624    private String prepareUserHost(String host, Context context) throws ActionExecutorException {
625        String oozieUser = context.getProtoActionConf().get(OozieClient.USER_NAME);
626        if (allowSshUserAtHost) {
627            if (!host.contains("@")) {
628                host = oozieUser + "@" + host;
629            }
630        }
631        else {
632            if (host.contains("@")) {
633                if (!host.toLowerCase().startsWith(oozieUser + "@")) {
634                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_USER_MISMATCH,
635                                                      XLog.format("user mismatch between oozie user [{0}] and ssh host [{1}]",
636                                                              oozieUser, host));
637                }
638            }
639            else {
640                host = oozieUser + "@" + host;
641            }
642        }
643        return host;
644    }
645
646    @Override
647    public boolean isCompleted(String externalStatus) {
648        return true;
649    }
650
651    /**
652     * Truncate the string to max length.
653     *
654     * @param strBuffer
655     * @return truncated string string
656     */
657    private String getTruncatedString(StringBuffer strBuffer) {
658
659        if (strBuffer.length() <= maxLen) {
660            return strBuffer.toString();
661        }
662        else {
663            return strBuffer.substring(0, maxLen);
664        }
665    }
666
667    /**
668     * Drains the inputStream and errorStream of the Process being executed. The contents of the streams are stored if a
669     * buffer is provided for the stream.
670     *
671     * @param p The Process instance.
672     * @param inputBuffer The buffer into which STDOUT is to be read. Can be null if only draining is required.
673     * @param errorBuffer The buffer into which STDERR is to be read. Can be null if only draining is required.
674     * @param maxLength The maximum data length to be stored in these buffers. This is an indicative value, and the
675     * store content may exceed this length.
676     * @return the exit value of the process.
677     * @throws IOException
678     */
679    private int drainBuffers(Process p, StringBuffer inputBuffer, StringBuffer errorBuffer, int maxLength)
680            throws IOException {
681        int exitValue = -1;
682        BufferedReader ir = new BufferedReader(new InputStreamReader(p.getInputStream()));
683        BufferedReader er = new BufferedReader(new InputStreamReader(p.getErrorStream()));
684
685        int inBytesRead = 0;
686        int errBytesRead = 0;
687
688        boolean processEnded = false;
689
690        try {
691            while (!processEnded) {
692                try {
693                    exitValue = p.exitValue();
694                    processEnded = true;
695                }
696                catch (IllegalThreadStateException ex) {
697                    // Continue to drain.
698                }
699
700                inBytesRead += drainBuffer(ir, inputBuffer, maxLength, inBytesRead, processEnded);
701                errBytesRead += drainBuffer(er, errorBuffer, maxLength, errBytesRead, processEnded);
702            }
703        }
704        finally {
705            ir.close();
706            er.close();
707        }
708        return exitValue;
709    }
710
711    /**
712     * Reads the contents of a stream and stores them into the provided buffer.
713     *
714     * @param br The stream to be read.
715     * @param storageBuf The buffer into which the contents of the stream are to be stored.
716     * @param maxLength The maximum number of bytes to be stored in the buffer. An indicative value and may be
717     * exceeded.
718     * @param bytesRead The number of bytes read from this stream to date.
719     * @param readAll If true, the stream is drained while their is data available in it. Otherwise, only a single chunk
720     * of data is read, irrespective of how much is available.
721     * @return
722     * @throws IOException
723     */
724    private int drainBuffer(BufferedReader br, StringBuffer storageBuf, int maxLength, int bytesRead, boolean readAll)
725            throws IOException {
726        int bReadSession = 0;
727        if (br.ready()) {
728            char[] buf = new char[1024];
729            do {
730                int bReadCurrent = br.read(buf, 0, 1024);
731                if (storageBuf != null && bytesRead < maxLength) {
732                    storageBuf.append(buf, 0, bReadCurrent);
733                }
734                bReadSession += bReadCurrent;
735            } while (br.ready() && readAll);
736        }
737        return bReadSession;
738    }
739
740    /**
741     * Returns the first line from a StringBuffer, recognized by the new line character \n.
742     *
743     * @param buffer The StringBuffer from which the first line is required.
744     * @return The first line of the buffer.
745     */
746    private String getFirstLine(StringBuffer buffer) {
747        int newLineIndex = 0;
748        newLineIndex = buffer.indexOf("\n");
749        if (newLineIndex == -1) {
750            return buffer.toString();
751        }
752        else {
753            return buffer.substring(0, newLineIndex);
754        }
755    }
756}