001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.action.ssh;
020
021import java.io.BufferedReader;
022import java.io.File;
023import java.io.FileWriter;
024import java.io.IOException;
025import java.io.InputStreamReader;
026import java.util.Arrays;
027import java.util.List;
028import java.util.concurrent.Callable;
029import org.apache.hadoop.util.StringUtils;
030
031import org.apache.oozie.client.WorkflowAction;
032import org.apache.oozie.client.OozieClient;
033import org.apache.oozie.client.WorkflowAction.Status;
034import org.apache.oozie.action.ActionExecutor;
035import org.apache.oozie.action.ActionExecutorException;
036import org.apache.oozie.service.CallbackService;
037import org.apache.oozie.service.ConfigurationService;
038import org.apache.oozie.servlet.CallbackServlet;
039import org.apache.oozie.service.Services;
040import org.apache.oozie.util.IOUtils;
041import org.apache.oozie.util.PropertiesUtils;
042import org.apache.oozie.util.XLog;
043import org.apache.oozie.util.XmlUtils;
044import org.jdom.Element;
045import org.jdom.JDOMException;
046import org.jdom.Namespace;
047
048/**
049 * Ssh action executor. <p> <ul> <li>Execute the shell commands on the remote host</li> <li>Copies the base and wrapper
050 * scripts on to the remote location</li> <li>Base script is used to run the command on the remote host</li> <li>Wrapper
051 * script is used to check the status of the submitted command</li> <li>handles the submission failures</li> </ul>
052 */
053public class SshActionExecutor extends ActionExecutor {
054    public static final String ACTION_TYPE = "ssh";
055
056    /**
057     * Configuration parameter which specifies whether the specified ssh user is allowed, or has to be the job user.
058     */
059    public static final String CONF_SSH_ALLOW_USER_AT_HOST = CONF_PREFIX + "ssh.allow.user.at.host";
060
061    protected static final String SSH_COMMAND_OPTIONS =
062            "-o PasswordAuthentication=no -o KbdInteractiveDevices=no -o StrictHostKeyChecking=no -o ConnectTimeout=20 ";
063
064    protected static final String SSH_COMMAND_BASE = "ssh " + SSH_COMMAND_OPTIONS;
065    protected static final String SCP_COMMAND_BASE = "scp " + SSH_COMMAND_OPTIONS;
066
067    public static final String ERR_SETUP_FAILED = "SETUP_FAILED";
068    public static final String ERR_EXECUTION_FAILED = "EXECUTION_FAILED";
069    public static final String ERR_UNKNOWN_ERROR = "UNKOWN_ERROR";
070    public static final String ERR_COULD_NOT_CONNECT = "COULD_NOT_CONNECT";
071    public static final String ERR_HOST_RESOLUTION = "COULD_NOT_RESOLVE_HOST";
072    public static final String ERR_FNF = "FNF";
073    public static final String ERR_AUTH_FAILED = "AUTH_FAILED";
074    public static final String ERR_NO_EXEC_PERM = "NO_EXEC_PERM";
075    public static final String ERR_USER_MISMATCH = "ERR_USER_MISMATCH";
076    public static final String ERR_EXCEDE_LEN = "ERR_OUTPUT_EXCEED_MAX_LEN";
077
078    public static final String DELETE_TMP_DIR = "oozie.action.ssh.delete.remote.tmp.dir";
079
080    public static final String HTTP_COMMAND = "oozie.action.ssh.http.command";
081
082    public static final String HTTP_COMMAND_OPTIONS = "oozie.action.ssh.http.command.post.options";
083
084    private static final String EXT_STATUS_VAR = "#status";
085
086    private static int maxLen;
087    private static boolean allowSshUserAtHost;
088
089    protected SshActionExecutor() {
090        super(ACTION_TYPE);
091    }
092
093    /**
094     * Initialize Action.
095     */
096    @Override
097    public void initActionType() {
098        super.initActionType();
099        maxLen = getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN, 2 * 1024);
100        allowSshUserAtHost = ConfigurationService.getBoolean(CONF_SSH_ALLOW_USER_AT_HOST);
101        registerError(InterruptedException.class.getName(), ActionExecutorException.ErrorType.ERROR, "SH001");
102        registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "SH002");
103        initSshScripts();
104    }
105
106    /**
107     * Check ssh action status.
108     *
109     * @param context action execution context.
110     * @param action action object.
111     */
112    @Override
113    public void check(Context context, WorkflowAction action) throws ActionExecutorException {
114        Status status = getActionStatus(context, action);
115        boolean captureOutput = false;
116        try {
117            Element eConf = XmlUtils.parseXml(action.getConf());
118            Namespace ns = eConf.getNamespace();
119            captureOutput = eConf.getChild("capture-output", ns) != null;
120        }
121        catch (JDOMException ex) {
122            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_XML_PARSE_FAILED",
123                                              "unknown error", ex);
124        }
125        XLog log = XLog.getLog(getClass());
126        log.debug("Capture Output: {0}", captureOutput);
127        if (status == Status.OK) {
128            if (captureOutput) {
129                String outFile = getRemoteFileName(context, action, "stdout", false, true);
130                String dataCommand = SSH_COMMAND_BASE + action.getTrackerUri() + " cat " + outFile;
131                log.debug("Ssh command [{0}]", dataCommand);
132                try {
133                    Process process = Runtime.getRuntime().exec(dataCommand.split("\\s"));
134                    StringBuffer buffer = new StringBuffer();
135                    boolean overflow = false;
136                    drainBuffers(process, buffer, null, maxLen);
137                    if (buffer.length() > maxLen) {
138                        overflow = true;
139                    }
140                    if (overflow) {
141                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR,
142                                                          "ERR_OUTPUT_EXCEED_MAX_LEN", "unknown error");
143                    }
144                    context.setExecutionData(status.toString(), PropertiesUtils.stringToProperties(buffer.toString()));
145                }
146                catch (Exception ex) {
147                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_UNKNOWN_ERROR",
148                                                      "unknown error", ex);
149                }
150            }
151            else {
152                context.setExecutionData(status.toString(), null);
153            }
154        }
155        else {
156            if (status == Status.ERROR) {
157                context.setExecutionData(status.toString(), null);
158            }
159            else {
160                context.setExternalStatus(status.toString());
161            }
162        }
163    }
164
165    /**
166     * Kill ssh action.
167     *
168     * @param context action execution context.
169     * @param action object.
170     */
171    @Override
172    public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
173        String command = "ssh " + action.getTrackerUri() + " kill  -KILL " + action.getExternalId();
174        int returnValue = getReturnValue(command);
175        if (returnValue != 0) {
176            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_TO_KILL", XLog.format(
177                    "Unable to kill process {0} on {1}", action.getExternalId(), action.getTrackerUri()));
178        }
179        context.setEndData(WorkflowAction.Status.KILLED, "ERROR");
180    }
181
182    /**
183     * Start the ssh action execution.
184     *
185     * @param context action execution context.
186     * @param action action object.
187     */
188    @SuppressWarnings("unchecked")
189    @Override
190    public void start(final Context context, final WorkflowAction action) throws ActionExecutorException {
191        XLog log = XLog.getLog(getClass());
192        log.info("start() begins");
193        String confStr = action.getConf();
194        Element conf;
195        try {
196            conf = XmlUtils.parseXml(confStr);
197        }
198        catch (Exception ex) {
199            throw convertException(ex);
200        }
201        Namespace nameSpace = conf.getNamespace();
202        Element hostElement = conf.getChild("host", nameSpace);
203        String hostString = hostElement.getValue().trim();
204        hostString = prepareUserHost(hostString, context);
205        final String host = hostString;
206        final String dirLocation = execute(new Callable<String>() {
207            public String call() throws Exception {
208                return setupRemote(host, context, action);
209            }
210
211        });
212
213        String runningPid = execute(new Callable<String>() {
214            public String call() throws Exception {
215                return checkIfRunning(host, context, action);
216            }
217        });
218        String pid = "";
219
220        if (runningPid == null) {
221            final Element commandElement = conf.getChild("command", nameSpace);
222            final boolean ignoreOutput = conf.getChild("capture-output", nameSpace) == null;
223
224            boolean preserve = false;
225            if (commandElement != null) {
226                String[] args = null;
227                // Will either have <args>, <arg>, or neither (but not both)
228                List<Element> argsList = conf.getChildren("args", nameSpace);
229                // Arguments in an <args> are "flattened" (spaces are delimiters)
230                if (argsList != null && argsList.size() > 0) {
231                    StringBuilder argsString = new StringBuilder("");
232                    for (Element argsElement : argsList) {
233                        argsString = argsString.append(argsElement.getValue()).append(" ");
234                    }
235                    args = new String[]{argsString.toString()};
236                }
237                else {
238                    // Arguments in an <arg> are preserved, even with spaces
239                    argsList = conf.getChildren("arg", nameSpace);
240                    if (argsList != null && argsList.size() > 0) {
241                        preserve = true;
242                        args = new String[argsList.size()];
243                        for (int i = 0; i < argsList.size(); i++) {
244                            Element argsElement = argsList.get(i);
245                            args[i] = argsElement.getValue();
246                            // Even though we're keeping the args as an array, if they contain a space we still have to either quote
247                            // them or escape their space (because the scripts will split them up otherwise)
248                            if (args[i].contains(" ") &&
249                                    !(args[i].startsWith("\"") && args[i].endsWith("\"") ||
250                                      args[i].startsWith("'") && args[i].endsWith("'"))) {
251                                args[i] = StringUtils.escapeString(args[i], '\\', ' ');
252                            }
253                        }
254                    }
255                }
256                final String[] argsF = args;
257                final String recoveryId = context.getRecoveryId();
258                final boolean preserveF = preserve;
259                pid = execute(new Callable<String>() {
260
261                    @Override
262                    public String call() throws Exception {
263                        return doExecute(host, dirLocation, commandElement.getValue(), argsF, ignoreOutput, action, recoveryId,
264                                preserveF);
265                    }
266
267                });
268            }
269            context.setStartData(pid, host, host);
270        }
271        else {
272            pid = runningPid;
273            context.setStartData(pid, host, host);
274            check(context, action);
275        }
276        log.info("start() ends");
277    }
278
279    private String checkIfRunning(String host, final Context context, final WorkflowAction action) {
280        String pid = null;
281        String outFile = getRemoteFileName(context, action, "pid", false, false);
282        String getOutputCmd = SSH_COMMAND_BASE + host + " cat " + outFile;
283        try {
284            Process process = Runtime.getRuntime().exec(getOutputCmd.split("\\s"));
285            StringBuffer buffer = new StringBuffer();
286            drainBuffers(process, buffer, null, maxLen);
287            pid = getFirstLine(buffer);
288
289            if (Long.valueOf(pid) > 0) {
290                return pid;
291            }
292            else {
293                return null;
294            }
295        }
296        catch (Exception e) {
297            return null;
298        }
299    }
300
301    /**
302     * Get remote host working location.
303     *
304     * @param context action execution context
305     * @param action Action
306     * @param fileExtension Extension to be added to file name
307     * @param dirOnly Get the Directory only
308     * @param useExtId Flag to use external ID in the path
309     * @return remote host file name/Directory.
310     */
311    public String getRemoteFileName(Context context, WorkflowAction action, String fileExtension, boolean dirOnly,
312                                    boolean useExtId) {
313        String path = getActionDirPath(context.getWorkflow().getId(), action, ACTION_TYPE, false) + "/";
314        if (dirOnly) {
315            return path;
316        }
317        if (useExtId) {
318            path = path + action.getExternalId() + ".";
319        }
320        path = path + context.getRecoveryId() + "." + fileExtension;
321        return path;
322    }
323
324    /**
325     * Utility method to execute command.
326     *
327     * @param command Command to execute as String.
328     * @return exit status of the execution.
329     * @throws IOException if process exits with status nonzero.
330     * @throws InterruptedException if process does not run properly.
331     */
332    public int executeCommand(String command) throws IOException, InterruptedException {
333        Runtime runtime = Runtime.getRuntime();
334        Process p = runtime.exec(command.split("\\s"));
335
336        StringBuffer errorBuffer = new StringBuffer();
337        int exitValue = drainBuffers(p, null, errorBuffer, maxLen);
338
339        String error = null;
340        if (exitValue != 0) {
341            error = getTruncatedString(errorBuffer);
342            throw new IOException(XLog.format("Not able to perform operation [{0}]", command) + " | " + "ErrorStream: "
343                    + error);
344        }
345        return exitValue;
346    }
347
348    /**
349     * Do ssh action execution setup on remote host.
350     *
351     * @param host host name.
352     * @param context action execution context.
353     * @param action action object.
354     * @return remote host working directory.
355     * @throws IOException thrown if failed to setup.
356     * @throws InterruptedException thrown if any interruption happens.
357     */
358    protected String setupRemote(String host, Context context, WorkflowAction action) throws IOException, InterruptedException {
359        XLog log = XLog.getLog(getClass());
360        log.info("Attempting to copy ssh base scripts to remote host [{0}]", host);
361        String localDirLocation = Services.get().getRuntimeDir() + "/ssh";
362        if (localDirLocation.endsWith("/")) {
363            localDirLocation = localDirLocation.substring(0, localDirLocation.length() - 1);
364        }
365        File file = new File(localDirLocation + "/ssh-base.sh");
366        if (!file.exists()) {
367            throw new IOException("Required Local file " + file.getAbsolutePath() + " not present.");
368        }
369        file = new File(localDirLocation + "/ssh-wrapper.sh");
370        if (!file.exists()) {
371            throw new IOException("Required Local file " + file.getAbsolutePath() + " not present.");
372        }
373        String remoteDirLocation = getRemoteFileName(context, action, null, true, true);
374        String command = XLog.format("{0}{1}  mkdir -p {2} ", SSH_COMMAND_BASE, host, remoteDirLocation).toString();
375        executeCommand(command);
376        command = XLog.format("{0}{1}/ssh-base.sh {2}/ssh-wrapper.sh {3}:{4}", SCP_COMMAND_BASE, localDirLocation,
377                              localDirLocation, host, remoteDirLocation);
378        executeCommand(command);
379        command = XLog.format("{0}{1}  chmod +x {2}ssh-base.sh {3}ssh-wrapper.sh ", SSH_COMMAND_BASE, host,
380                              remoteDirLocation, remoteDirLocation);
381        executeCommand(command);
382        return remoteDirLocation;
383    }
384
385    /**
386     * Execute the ssh command.
387     *
388     * @param host hostname.
389     * @param dirLocation location of the base and wrapper scripts.
390     * @param cmnd command to be executed.
391     * @param args command arguments.
392     * @param ignoreOutput ignore output option.
393     * @param action action object.
394     * @param recoveryId action id + run number to enable recovery in rerun
395     * @param preserveArgs tell the ssh scripts to preserve or flatten the arguments
396     * @return process id of the running command.
397     * @throws IOException thrown if failed to run the command.
398     * @throws InterruptedException thrown if any interruption happens.
399     */
400    protected String doExecute(String host, String dirLocation, String cmnd, String[] args, boolean ignoreOutput,
401                               WorkflowAction action, String recoveryId, boolean preserveArgs)
402                               throws IOException, InterruptedException {
403        XLog log = XLog.getLog(getClass());
404        Runtime runtime = Runtime.getRuntime();
405        String callbackPost = ignoreOutput ? "_" : ConfigurationService.get(HTTP_COMMAND_OPTIONS).replace(" ", "%%%");
406        String preserveArgsS = preserveArgs ? "PRESERVE_ARGS" : "FLATTEN_ARGS";
407        // TODO check
408        String callBackUrl = Services.get().get(CallbackService.class)
409                .createCallBackUrl(action.getId(), EXT_STATUS_VAR);
410        String command = XLog.format("{0}{1} {2}ssh-base.sh {3} {4} \"{5}\" \"{6}\" {7} {8} ", SSH_COMMAND_BASE, host, dirLocation,
411                preserveArgsS, ConfigurationService.get(HTTP_COMMAND), callBackUrl, callbackPost, recoveryId, cmnd)
412                .toString();
413        String[] commandArray = command.split("\\s");
414        String[] finalCommand;
415        if (args == null) {
416            finalCommand = commandArray;
417        }
418        else {
419            finalCommand = new String[commandArray.length + args.length];
420            System.arraycopy(commandArray, 0, finalCommand, 0, commandArray.length);
421            System.arraycopy(args, 0, finalCommand, commandArray.length, args.length);
422        }
423        log.trace("Executing ssh command [{0}]", Arrays.toString(finalCommand));
424        Process p = runtime.exec(finalCommand);
425        String pid = "";
426
427        StringBuffer inputBuffer = new StringBuffer();
428        StringBuffer errorBuffer = new StringBuffer();
429        int exitValue = drainBuffers(p, inputBuffer, errorBuffer, maxLen);
430
431        pid = getFirstLine(inputBuffer);
432
433        String error = null;
434        if (exitValue != 0) {
435            error = getTruncatedString(errorBuffer);
436            throw new IOException(XLog.format("Not able to execute ssh-base.sh on {0}", host) + " | " + "ErrorStream: "
437                    + error);
438        }
439        return pid;
440    }
441
442    /**
443     * End action execution.
444     *
445     * @param context action execution context.
446     * @param action action object.
447     * @throws ActionExecutorException thrown if action end execution fails.
448     */
449    public void end(final Context context, final WorkflowAction action) throws ActionExecutorException {
450        if (action.getExternalStatus().equals("OK")) {
451            context.setEndData(WorkflowAction.Status.OK, WorkflowAction.Status.OK.toString());
452        }
453        else {
454            context.setEndData(WorkflowAction.Status.ERROR, WorkflowAction.Status.ERROR.toString());
455        }
456        boolean deleteTmpDir = ConfigurationService.getBoolean(DELETE_TMP_DIR);
457        if (deleteTmpDir) {
458            String tmpDir = getRemoteFileName(context, action, null, true, false);
459            String removeTmpDirCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " rm -rf " + tmpDir;
460            int retVal = getReturnValue(removeTmpDirCmd);
461            if (retVal != 0) {
462                XLog.getLog(getClass()).warn("Cannot delete temp dir {0}", tmpDir);
463            }
464        }
465    }
466
467    /**
468     * Get the return value of a process.
469     *
470     * @param command command to be executed.
471     * @return zero if execution is successful and any non zero value for failure.
472     * @throws ActionExecutorException
473     */
474    private int getReturnValue(String command) throws ActionExecutorException {
475        int returnValue;
476        Process ps = null;
477        try {
478            ps = Runtime.getRuntime().exec(command.split("\\s"));
479            returnValue = drainBuffers(ps, null, null, 0);
480        }
481        catch (IOException e) {
482            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_OPERATION", XLog.format(
483                    "Not able to perform operation {0}", command), e);
484        }
485        finally {
486            ps.destroy();
487        }
488        return returnValue;
489    }
490
491    /**
492     * Copy the ssh base and wrapper scripts to the local directory.
493     */
494    private void initSshScripts() {
495        String dirLocation = Services.get().getRuntimeDir() + "/ssh";
496        File path = new File(dirLocation);
497        path.mkdirs();
498        if (!path.exists()) {
499            throw new RuntimeException(XLog.format("Not able to create required directory {0}", dirLocation));
500        }
501        try {
502            IOUtils.copyCharStream(IOUtils.getResourceAsReader("ssh-base.sh", -1), new FileWriter(dirLocation
503                    + "/ssh-base.sh"));
504            IOUtils.copyCharStream(IOUtils.getResourceAsReader("ssh-wrapper.sh", -1), new FileWriter(dirLocation
505                    + "/ssh-wrapper.sh"));
506        }
507        catch (IOException ie) {
508            throw new RuntimeException(XLog.format("Not able to copy required scripts file to {0} "
509                    + "for SshActionHandler", dirLocation));
510        }
511    }
512
513    /**
514     * Get action status.
515     *
516     * @param action action object.
517     * @return status of the action(RUNNING/OK/ERROR).
518     * @throws ActionExecutorException thrown if there is any error in getting status.
519     */
520    protected Status getActionStatus(Context context, WorkflowAction action) throws ActionExecutorException {
521        String command = SSH_COMMAND_BASE + action.getTrackerUri() + " ps -p " + action.getExternalId();
522        Status aStatus;
523        int returnValue = getReturnValue(command);
524        if (returnValue == 0) {
525            aStatus = Status.RUNNING;
526        }
527        else {
528            String outFile = getRemoteFileName(context, action, "error", false, true);
529            String checkErrorCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " ls " + outFile;
530            int retVal = getReturnValue(checkErrorCmd);
531            if (retVal == 0) {
532                aStatus = Status.ERROR;
533            }
534            else {
535                aStatus = Status.OK;
536            }
537        }
538        return aStatus;
539    }
540
541    /**
542     * Execute the callable.
543     *
544     * @param callable required callable.
545     * @throws ActionExecutorException thrown if there is any error in command execution.
546     */
547    private <T> T execute(Callable<T> callable) throws ActionExecutorException {
548        XLog log = XLog.getLog(getClass());
549        try {
550            return callable.call();
551        }
552        catch (IOException ex) {
553            log.warn("Error while executing ssh EXECUTION");
554            String errorMessage = ex.getMessage();
555            if (null == errorMessage) { // Unknown IOException
556                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_UNKNOWN_ERROR, ex
557                        .getMessage(), ex);
558            } // Host Resolution Issues
559            else {
560                if (errorMessage.contains("Could not resolve hostname") ||
561                        errorMessage.contains("service not known")) {
562                    throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_HOST_RESOLUTION, ex
563                            .getMessage(), ex);
564                } // Connection Timeout. Host temporarily down.
565                else {
566                    if (errorMessage.contains("timed out")) {
567                        throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_COULD_NOT_CONNECT,
568                                                          ex.getMessage(), ex);
569                    }// Local ssh-base or ssh-wrapper missing
570                    else {
571                        if (errorMessage.contains("Required Local file")) {
572                            throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_FNF,
573                                                              ex.getMessage(), ex); // local_FNF
574                        }// Required oozie bash scripts missing, after the copy was
575                        // successful
576                        else {
577                            if (errorMessage.contains("No such file or directory")
578                                    && (errorMessage.contains("ssh-base") || errorMessage.contains("ssh-wrapper"))) {
579                                throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_FNF,
580                                                                  ex.getMessage(), ex); // remote
581                                // FNF
582                            } // Required application execution binary missing (either
583                            // caught by ssh-wrapper
584                            else {
585                                if (errorMessage.contains("command not found")) {
586                                    throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, ERR_FNF, ex
587                                            .getMessage(), ex); // remote
588                                    // FNF
589                                } // Permission denied while connecting
590                                else {
591                                    if (errorMessage.contains("Permission denied")) {
592                                        throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT,
593                                                ERR_AUTH_FAILED, ex.getMessage(), ex);
594                                    } // Permission denied while executing
595                                    else {
596                                        if (errorMessage.contains(": Permission denied")) {
597                                            throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT,
598                                                    ERR_NO_EXEC_PERM, ex.getMessage(), ex);
599                                        }
600                                        else {
601                                            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR,
602                                                    ERR_UNKNOWN_ERROR, ex.getMessage(), ex);
603                                        }
604                                    }
605                                }
606                            }
607                        }
608                    }
609                }
610            }
611        } // Any other type of exception
612        catch (Exception ex) {
613            throw convertException(ex);
614        }
615    }
616
617    /**
618     * Checks whether the system is configured to always use the oozie user for ssh, and injects the user if required.
619     *
620     * @param host the host string.
621     * @param context the execution context.
622     * @return the modified host string with a user parameter added on if required.
623     * @throws ActionExecutorException in case the flag to use the oozie user is turned on and there is a mismatch
624     * between the user specified in the host and the oozie user.
625     */
626    private String prepareUserHost(String host, Context context) throws ActionExecutorException {
627        String oozieUser = context.getProtoActionConf().get(OozieClient.USER_NAME);
628        if (allowSshUserAtHost) {
629            if (!host.contains("@")) {
630                host = oozieUser + "@" + host;
631            }
632        }
633        else {
634            if (host.contains("@")) {
635                if (!host.toLowerCase().startsWith(oozieUser + "@")) {
636                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_USER_MISMATCH,
637                                                      XLog.format("user mismatch between oozie user [{0}] and ssh host [{1}]",
638                                                              oozieUser, host));
639                }
640            }
641            else {
642                host = oozieUser + "@" + host;
643            }
644        }
645        return host;
646    }
647
648    @Override
649    public boolean isCompleted(String externalStatus) {
650        return true;
651    }
652
653    /**
654     * Truncate the string to max length.
655     *
656     * @param strBuffer
657     * @return truncated string string
658     */
659    private String getTruncatedString(StringBuffer strBuffer) {
660
661        if (strBuffer.length() <= maxLen) {
662            return strBuffer.toString();
663        }
664        else {
665            return strBuffer.substring(0, maxLen);
666        }
667    }
668
669    /**
670     * Drains the inputStream and errorStream of the Process being executed. The contents of the streams are stored if a
671     * buffer is provided for the stream.
672     *
673     * @param p The Process instance.
674     * @param inputBuffer The buffer into which STDOUT is to be read. Can be null if only draining is required.
675     * @param errorBuffer The buffer into which STDERR is to be read. Can be null if only draining is required.
676     * @param maxLength The maximum data length to be stored in these buffers. This is an indicative value, and the
677     * store content may exceed this length.
678     * @return the exit value of the process.
679     * @throws IOException
680     */
681    private int drainBuffers(Process p, StringBuffer inputBuffer, StringBuffer errorBuffer, int maxLength)
682            throws IOException {
683        int exitValue = -1;
684        BufferedReader ir = new BufferedReader(new InputStreamReader(p.getInputStream()));
685        BufferedReader er = new BufferedReader(new InputStreamReader(p.getErrorStream()));
686
687        int inBytesRead = 0;
688        int errBytesRead = 0;
689
690        boolean processEnded = false;
691
692        try {
693            while (!processEnded) {
694                try {
695                    exitValue = p.exitValue();
696                    processEnded = true;
697                }
698                catch (IllegalThreadStateException ex) {
699                    // Continue to drain.
700                }
701
702                inBytesRead += drainBuffer(ir, inputBuffer, maxLength, inBytesRead, processEnded);
703                errBytesRead += drainBuffer(er, errorBuffer, maxLength, errBytesRead, processEnded);
704            }
705        }
706        finally {
707            ir.close();
708            er.close();
709        }
710        return exitValue;
711    }
712
713    /**
714     * Reads the contents of a stream and stores them into the provided buffer.
715     *
716     * @param br The stream to be read.
717     * @param storageBuf The buffer into which the contents of the stream are to be stored.
718     * @param maxLength The maximum number of bytes to be stored in the buffer. An indicative value and may be
719     * exceeded.
720     * @param bytesRead The number of bytes read from this stream to date.
721     * @param readAll If true, the stream is drained while their is data available in it. Otherwise, only a single chunk
722     * of data is read, irrespective of how much is available.
723     * @return
724     * @throws IOException
725     */
726    private int drainBuffer(BufferedReader br, StringBuffer storageBuf, int maxLength, int bytesRead, boolean readAll)
727            throws IOException {
728        int bReadSession = 0;
729        if (br.ready()) {
730            char[] buf = new char[1024];
731            do {
732                int bReadCurrent = br.read(buf, 0, 1024);
733                if (storageBuf != null && bytesRead < maxLength) {
734                    storageBuf.append(buf, 0, bReadCurrent);
735                }
736                bReadSession += bReadCurrent;
737            } while (br.ready() && readAll);
738        }
739        return bReadSession;
740    }
741
742    /**
743     * Returns the first line from a StringBuffer, recognized by the new line character \n.
744     *
745     * @param buffer The StringBuffer from which the first line is required.
746     * @return The first line of the buffer.
747     */
748    private String getFirstLine(StringBuffer buffer) {
749        int newLineIndex = 0;
750        newLineIndex = buffer.indexOf("\n");
751        if (newLineIndex == -1) {
752            return buffer.toString();
753        }
754        else {
755            return buffer.substring(0, newLineIndex);
756        }
757    }
758}