001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.ssh;
019    
020    import java.io.BufferedReader;
021    import java.io.File;
022    import java.io.FileWriter;
023    import java.io.IOException;
024    import java.io.InputStreamReader;
025    import java.util.List;
026    import java.util.concurrent.Callable;
027    
028    import org.apache.oozie.client.WorkflowAction;
029    import org.apache.oozie.client.OozieClient;
030    import org.apache.oozie.client.WorkflowAction.Status;
031    import org.apache.oozie.action.ActionExecutor;
032    import org.apache.oozie.action.ActionExecutorException;
033    import org.apache.oozie.service.CallbackService;
034    import org.apache.oozie.servlet.CallbackServlet;
035    import org.apache.oozie.service.Services;
036    import org.apache.oozie.util.IOUtils;
037    import org.apache.oozie.util.PropertiesUtils;
038    import org.apache.oozie.util.XLog;
039    import org.apache.oozie.util.XmlUtils;
040    import org.jdom.Element;
041    import org.jdom.JDOMException;
042    import org.jdom.Namespace;
043    
044    /**
045     * Ssh action executor. <p/> <ul> <li>Execute the shell commands on the remote host</li> <li>Copies the base and wrapper
046     * scripts on to the remote location</li> <li>Base script is used to run the command on the remote host</li> <li>Wrapper
047     * script is used to check the status of the submitted command</li> <li>handles the submission failures</li> </ul>
048     */
049    public class SshActionExecutor extends ActionExecutor {
050        public static final String ACTION_TYPE = "ssh";
051    
052        /**
053         * Configuration parameter which specifies whether the specified ssh user is allowed, or has to be the job user.
054         */
055        public static final String CONF_SSH_ALLOW_USER_AT_HOST = CONF_PREFIX + "ssh.allow.user.at.host";
056    
057        protected static final String SSH_COMMAND_OPTIONS =
058                "-o PasswordAuthentication=no -o KbdInteractiveDevices=no -o StrictHostKeyChecking=no -o ConnectTimeout=20 ";
059    
060        protected static final String SSH_COMMAND_BASE = "ssh " + SSH_COMMAND_OPTIONS;
061        protected static final String SCP_COMMAND_BASE = "scp " + SSH_COMMAND_OPTIONS;
062    
063        public static final String ERR_SETUP_FAILED = "SETUP_FAILED";
064        public static final String ERR_EXECUTION_FAILED = "EXECUTION_FAILED";
065        public static final String ERR_UNKNOWN_ERROR = "UNKOWN_ERROR";
066        public static final String ERR_COULD_NOT_CONNECT = "COULD_NOT_CONNECT";
067        public static final String ERR_HOST_RESOLUTION = "COULD_NOT_RESOLVE_HOST";
068        public static final String ERR_FNF = "FNF";
069        public static final String ERR_AUTH_FAILED = "AUTH_FAILED";
070        public static final String ERR_NO_EXEC_PERM = "NO_EXEC_PERM";
071        public static final String ERR_USER_MISMATCH = "ERR_USER_MISMATCH";
072        public static final String ERR_EXCEDE_LEN = "ERR_OUTPUT_EXCEED_MAX_LEN";
073    
074        public static final String DELETE_TMP_DIR = "oozie.action.ssh.delete.remote.tmp.dir";
075    
076        public static final String HTTP_COMMAND = "oozie.action.ssh.http.command";
077    
078        public static final String HTTP_COMMAND_OPTIONS = "oozie.action.ssh.http.command.post.options";
079    
080        private static final String EXT_STATUS_VAR = "#status";
081    
082        private static int maxLen;
083        private static boolean allowSshUserAtHost;
084    
085        protected SshActionExecutor() {
086            super(ACTION_TYPE);
087        }
088    
089        /**
090         * Initialize Action.
091         */
092        @Override
093        public void initActionType() {
094            super.initActionType();
095            maxLen = getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN, 2 * 1024);
096            allowSshUserAtHost = getOozieConf().getBoolean(CONF_SSH_ALLOW_USER_AT_HOST, true);
097            registerError(InterruptedException.class.getName(), ActionExecutorException.ErrorType.ERROR, "SH001");
098            registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "SH002");
099            initSshScripts();
100        }
101    
102        /**
103         * Check ssh action status.
104         *
105         * @param context action execution context.
106         * @param action action object.
107         */
108        @Override
109        public void check(Context context, WorkflowAction action) throws ActionExecutorException {
110            Status status = getActionStatus(context, action);
111            boolean captureOutput = false;
112            try {
113                Element eConf = XmlUtils.parseXml(action.getConf());
114                Namespace ns = eConf.getNamespace();
115                captureOutput = eConf.getChild("capture-output", ns) != null;
116            }
117            catch (JDOMException ex) {
118                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_XML_PARSE_FAILED",
119                                                  "unknown error", ex);
120            }
121            XLog log = XLog.getLog(getClass());
122            log.debug("Capture Output: {0}", captureOutput);
123            if (status == Status.OK) {
124                if (captureOutput) {
125                    String outFile = getRemoteFileName(context, action, "stdout", false, true);
126                    String dataCommand = SSH_COMMAND_BASE + action.getTrackerUri() + " cat " + outFile;
127                    log.debug("Ssh command [{0}]", dataCommand);
128                    try {
129                        Process process = Runtime.getRuntime().exec(dataCommand.split("\\s"));
130                        StringBuffer buffer = new StringBuffer();
131                        boolean overflow = false;
132                        drainBuffers(process, buffer, null, maxLen);
133                        if (buffer.length() > maxLen) {
134                            overflow = true;
135                        }
136                        if (overflow) {
137                            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR,
138                                                              "ERR_OUTPUT_EXCEED_MAX_LEN", "unknown error");
139                        }
140                        context.setExecutionData(status.toString(), PropertiesUtils.stringToProperties(buffer.toString()));
141                    }
142                    catch (Exception ex) {
143                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "ERR_UNKNOWN_ERROR",
144                                                          "unknown error", ex);
145                    }
146                }
147                else {
148                    context.setExecutionData(status.toString(), null);
149                }
150            }
151            else {
152                if (status == Status.ERROR) {
153                    context.setExecutionData(status.toString(), null);
154                }
155                else {
156                    context.setExternalStatus(status.toString());
157                }
158            }
159        }
160    
161        /**
162         * Kill ssh action.
163         *
164         * @param context action execution context.
165         * @param action object.
166         */
167        @Override
168        public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
169            String command = "ssh " + action.getTrackerUri() + " kill  -KILL " + action.getExternalId();
170            int returnValue = getReturnValue(command);
171            if (returnValue != 0) {
172                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_TO_KILL", XLog.format(
173                        "Unable to kill process {0} on {1}", action.getExternalId(), action.getTrackerUri()));
174            }
175            context.setEndData(WorkflowAction.Status.KILLED, "ERROR");
176        }
177    
178        /**
179         * Start the ssh action execution.
180         *
181         * @param context action execution context.
182         * @param action action object.
183         */
184        @SuppressWarnings("unchecked")
185        @Override
186        public void start(final Context context, final WorkflowAction action) throws ActionExecutorException {
187            XLog log = XLog.getLog(getClass());
188            log.info("start() begins");
189            String confStr = action.getConf();
190            Element conf;
191            try {
192                conf = XmlUtils.parseXml(confStr);
193            }
194            catch (Exception ex) {
195                throw convertException(ex);
196            }
197            Namespace nameSpace = conf.getNamespace();
198            Element hostElement = conf.getChild("host", nameSpace);
199            String hostString = hostElement.getValue().trim();
200            hostString = prepareUserHost(hostString, context);
201            final String host = hostString;
202            final String dirLocation = execute(new Callable<String>() {
203                public String call() throws Exception {
204                    return setupRemote(host, context, action);
205                }
206    
207            });
208    
209            String runningPid = execute(new Callable<String>() {
210                public String call() throws Exception {
211                    return checkIfRunning(host, context, action);
212                }
213            });
214            String pid = "";
215    
216            if (runningPid == null) {
217                final Element commandElement = conf.getChild("command", nameSpace);
218                final boolean ignoreOutput = conf.getChild("capture-output", nameSpace) == null;
219    
220                if (commandElement != null) {
221                    List<Element> argsList = conf.getChildren("args", nameSpace);
222                    StringBuilder args = new StringBuilder("");
223                    if ((argsList != null) && (argsList.size() > 0)) {
224                        for (Element argsElement : argsList) {
225                            args = args.append(argsElement.getValue()).append(" ");
226                        }
227                        args.setLength(args.length() - 1);
228                    }
229                    final String argsString = args.toString();
230                    final String recoveryId = context.getRecoveryId();
231                    pid = execute(new Callable<String>() {
232    
233                        @Override
234                        public String call() throws Exception {
235                            return doExecute(host, dirLocation, commandElement.getValue(), argsString, ignoreOutput,
236                                             action, recoveryId);
237                        }
238    
239                    });
240                }
241                context.setStartData(pid, host, host);
242            }
243            else {
244                pid = runningPid;
245                context.setStartData(pid, host, host);
246                check(context, action);
247            }
248            log.info("start() ends");
249        }
250    
251        private String checkIfRunning(String host, final Context context, final WorkflowAction action) {
252            String pid = null;
253            String outFile = getRemoteFileName(context, action, "pid", false, false);
254            String getOutputCmd = SSH_COMMAND_BASE + host + " cat " + outFile;
255            try {
256                Process process = Runtime.getRuntime().exec(getOutputCmd.split("\\s"));
257                StringBuffer buffer = new StringBuffer();
258                drainBuffers(process, buffer, null, maxLen);
259                pid = getFirstLine(buffer);
260    
261                if (Long.valueOf(pid) > 0) {
262                    return pid;
263                }
264                else {
265                    return null;
266                }
267            }
268            catch (Exception e) {
269                return null;
270            }
271        }
272    
273        /**
274         * Get remote host working location.
275         *
276         * @param context action execution context
277         * @param action Action
278         * @param fileExtension Extension to be added to file name
279         * @param dirOnly Get the Directory only
280         * @param useExtId Flag to use external ID in the path
281         * @return remote host file name/Directory.
282         */
283        public String getRemoteFileName(Context context, WorkflowAction action, String fileExtension, boolean dirOnly,
284                                        boolean useExtId) {
285            String path = getActionDirPath(context.getWorkflow().getId(), action, ACTION_TYPE, false) + "/";
286            if (dirOnly) {
287                return path;
288            }
289            if (useExtId) {
290                path = path + action.getExternalId() + ".";
291            }
292            path = path + context.getRecoveryId() + "." + fileExtension;
293            return path;
294        }
295    
296        /**
297         * Utility method to execute command.
298         *
299         * @param command Command to execute as String.
300         * @return exit status of the execution.
301         * @throws IOException if process exits with status nonzero.
302         * @throws InterruptedException if process does not run properly.
303         */
304        public int executeCommand(String command) throws IOException, InterruptedException {
305            Runtime runtime = Runtime.getRuntime();
306            Process p = runtime.exec(command.split("\\s"));
307    
308            StringBuffer errorBuffer = new StringBuffer();
309            int exitValue = drainBuffers(p, null, errorBuffer, maxLen);
310    
311            String error = null;
312            if (exitValue != 0) {
313                error = getTruncatedString(errorBuffer);
314                throw new IOException(XLog.format("Not able to perform operation [{0}]", command) + " | " + "ErrorStream: "
315                        + error);
316            }
317            return exitValue;
318        }
319    
320        /**
321         * Do ssh action execution setup on remote host.
322         *
323         * @param host host name.
324         * @param context action execution context.
325         * @param action action object.
326         * @return remote host working directory.
327         * @throws IOException thrown if failed to setup.
328         * @throws InterruptedException thrown if any interruption happens.
329         */
330        protected String setupRemote(String host, Context context, WorkflowAction action) throws IOException, InterruptedException {
331            XLog log = XLog.getLog(getClass());
332            log.info("Attempting to copy ssh base scripts to remote host [{0}]", host);
333            String localDirLocation = Services.get().getRuntimeDir() + "/ssh";
334            if (localDirLocation.endsWith("/")) {
335                localDirLocation = localDirLocation.substring(0, localDirLocation.length() - 1);
336            }
337            File file = new File(localDirLocation + "/ssh-base.sh");
338            if (!file.exists()) {
339                throw new IOException("Required Local file " + file.getAbsolutePath() + " not present.");
340            }
341            file = new File(localDirLocation + "/ssh-wrapper.sh");
342            if (!file.exists()) {
343                throw new IOException("Required Local file " + file.getAbsolutePath() + " not present.");
344            }
345            String remoteDirLocation = getRemoteFileName(context, action, null, true, true);
346            String command = XLog.format("{0}{1}  mkdir -p {2} ", SSH_COMMAND_BASE, host, remoteDirLocation).toString();
347            executeCommand(command);
348            command = XLog.format("{0}{1}/ssh-base.sh {2}/ssh-wrapper.sh {3}:{4}", SCP_COMMAND_BASE, localDirLocation,
349                                  localDirLocation, host, remoteDirLocation);
350            executeCommand(command);
351            command = XLog.format("{0}{1}  chmod +x {2}ssh-base.sh {3}ssh-wrapper.sh ", SSH_COMMAND_BASE, host,
352                                  remoteDirLocation, remoteDirLocation);
353            executeCommand(command);
354            return remoteDirLocation;
355        }
356    
357        /**
358         * Execute the ssh command.
359         *
360         * @param host hostname.
361         * @param dirLocation location of the base and wrapper scripts.
362         * @param cmnd command to be executed.
363         * @param args command arguments.
364         * @param ignoreOutput ignore output option.
365         * @param action action object.
366         * @param recoveryId action id + run number to enable recovery in rerun
367         * @return process id of the running command.
368         * @throws IOException thrown if failed to run the command.
369         * @throws InterruptedException thrown if any interruption happens.
370         */
371        protected String doExecute(String host, String dirLocation, String cmnd, String args, boolean ignoreOutput,
372                                   WorkflowAction action, String recoveryId) throws IOException, InterruptedException {
373            XLog log = XLog.getLog(getClass());
374            Runtime runtime = Runtime.getRuntime();
375            String callbackPost = ignoreOutput ? "_" : getOozieConf().get(HTTP_COMMAND_OPTIONS).replace(" ", "%%%");
376            // TODO check
377            String callBackUrl = Services.get().get(CallbackService.class)
378                    .createCallBackUrl(action.getId(), EXT_STATUS_VAR);
379            String command = XLog.format("{0}{1} {2}ssh-base.sh {3} \"{4}\" \"{5}\" {6} {7} {8} ", SSH_COMMAND_BASE, host,
380                                         dirLocation, getOozieConf().get(HTTP_COMMAND), callBackUrl, callbackPost, recoveryId, cmnd, args)
381                    .toString();
382            log.trace("Executing ssh command [{0}]", command);
383            Process p = runtime.exec(command.split("\\s"));
384            String pid = "";
385    
386            StringBuffer inputBuffer = new StringBuffer();
387            StringBuffer errorBuffer = new StringBuffer();
388            int exitValue = drainBuffers(p, inputBuffer, errorBuffer, maxLen);
389    
390            pid = getFirstLine(inputBuffer);
391    
392            String error = null;
393            if (exitValue != 0) {
394                error = getTruncatedString(errorBuffer);
395                throw new IOException(XLog.format("Not able to execute ssh-base.sh on {0}", host) + " | " + "ErrorStream: "
396                        + error);
397            }
398            return pid;
399        }
400    
401        /**
402         * End action execution.
403         *
404         * @param context action execution context.
405         * @param action action object.
406         * @throws ActionExecutorException thrown if action end execution fails.
407         */
408        public void end(final Context context, final WorkflowAction action) throws ActionExecutorException {
409            if (action.getExternalStatus().equals("OK")) {
410                context.setEndData(WorkflowAction.Status.OK, WorkflowAction.Status.OK.toString());
411            }
412            else {
413                context.setEndData(WorkflowAction.Status.ERROR, WorkflowAction.Status.ERROR.toString());
414            }
415            boolean deleteTmpDir = getOozieConf().getBoolean(DELETE_TMP_DIR, true);
416            if (deleteTmpDir) {
417                String tmpDir = getRemoteFileName(context, action, null, true, false);
418                String removeTmpDirCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " rm -rf " + tmpDir;
419                int retVal = getReturnValue(removeTmpDirCmd);
420                if (retVal != 0) {
421                    XLog.getLog(getClass()).warn("Cannot delete temp dir {0}", tmpDir);
422                }
423            }
424        }
425    
426        /**
427         * Get the return value of a process.
428         *
429         * @param command command to be executed.
430         * @return zero if execution is successful and any non zero value for failure.
431         * @throws ActionExecutorException
432         */
433        private int getReturnValue(String command) throws ActionExecutorException {
434            int returnValue;
435            Process ps = null;
436            try {
437                ps = Runtime.getRuntime().exec(command.split("\\s"));
438                returnValue = drainBuffers(ps, null, null, 0);
439            }
440            catch (IOException e) {
441                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_OPERATION", XLog.format(
442                        "Not able to perform operation {0}", command), e);
443            }
444            finally {
445                ps.destroy();
446            }
447            return returnValue;
448        }
449    
450        /**
451         * Copy the ssh base and wrapper scripts to the local directory.
452         */
453        private void initSshScripts() {
454            String dirLocation = Services.get().getRuntimeDir() + "/ssh";
455            File path = new File(dirLocation);
456            if (!path.mkdirs()) {
457                throw new RuntimeException(XLog.format("Not able to create required directory {0}", dirLocation));
458            }
459            try {
460                IOUtils.copyCharStream(IOUtils.getResourceAsReader("ssh-base.sh", -1), new FileWriter(dirLocation
461                        + "/ssh-base.sh"));
462                IOUtils.copyCharStream(IOUtils.getResourceAsReader("ssh-wrapper.sh", -1), new FileWriter(dirLocation
463                        + "/ssh-wrapper.sh"));
464            }
465            catch (IOException ie) {
466                throw new RuntimeException(XLog.format("Not able to copy required scripts file to {0} "
467                        + "for SshActionHandler", dirLocation));
468            }
469        }
470    
471        /**
472         * Get action status.
473         *
474         * @param action action object.
475         * @return status of the action(RUNNING/OK/ERROR).
476         * @throws ActionExecutorException thrown if there is any error in getting status.
477         */
478        protected Status getActionStatus(Context context, WorkflowAction action) throws ActionExecutorException {
479            String command = SSH_COMMAND_BASE + action.getTrackerUri() + " ps -p " + action.getExternalId();
480            Status aStatus;
481            int returnValue = getReturnValue(command);
482            if (returnValue == 0) {
483                aStatus = Status.RUNNING;
484            }
485            else {
486                String outFile = getRemoteFileName(context, action, "error", false, true);
487                String checkErrorCmd = SSH_COMMAND_BASE + action.getTrackerUri() + " ls " + outFile;
488                int retVal = getReturnValue(checkErrorCmd);
489                if (retVal == 0) {
490                    aStatus = Status.ERROR;
491                }
492                else {
493                    aStatus = Status.OK;
494                }
495            }
496            return aStatus;
497        }
498    
499        /**
500         * Execute the callable.
501         *
502         * @param callable required callable.
503         * @throws ActionExecutorException thrown if there is any error in command execution.
504         */
505        private <T> T execute(Callable<T> callable) throws ActionExecutorException {
506            XLog log = XLog.getLog(getClass());
507            try {
508                return callable.call();
509            }
510            catch (IOException ex) {
511                log.warn("Error while executing ssh EXECUTION");
512                String errorMessage = ex.getMessage();
513                if (null == errorMessage) { // Unknown IOException
514                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_UNKNOWN_ERROR, ex
515                            .getMessage(), ex);
516                } // Host Resolution Issues
517                else {
518                    if (errorMessage.contains("Could not resolve hostname") ||
519                            errorMessage.contains("service not known")) {
520                        throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_HOST_RESOLUTION, ex
521                                .getMessage(), ex);
522                    } // Connection Timeout. Host temporarily down.
523                    else {
524                        if (errorMessage.contains("timed out")) {
525                            throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_COULD_NOT_CONNECT,
526                                                              ex.getMessage(), ex);
527                        }// Local ssh-base or ssh-wrapper missing
528                        else {
529                            if (errorMessage.contains("Required Local file")) {
530                                throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_FNF,
531                                                                  ex.getMessage(), ex); // local_FNF
532                            }// Required oozie bash scripts missing, after the copy was
533                            // successful
534                            else {
535                                if (errorMessage.contains("No such file or directory")
536                                        && (errorMessage.contains("ssh-base") || errorMessage.contains("ssh-wrapper"))) {
537                                    throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, ERR_FNF,
538                                                                      ex.getMessage(), ex); // remote
539                                    // FNF
540                                } // Required application execution binary missing (either
541                                // caught by ssh-wrapper
542                                else {
543                                    if (errorMessage.contains("command not found")) {
544                                        throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, ERR_FNF, ex
545                                                .getMessage(), ex); // remote
546                                        // FNF
547                                    } // Permission denied while connecting
548                                    else {
549                                        if (errorMessage.contains("Permission denied")) {
550                                            throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, ERR_AUTH_FAILED, ex
551                                                    .getMessage(), ex);
552                                        } // Permission denied while executing
553                                        else {
554                                            if (errorMessage.contains(": Permission denied")) {
555                                                throw new ActionExecutorException(ActionExecutorException.ErrorType.NON_TRANSIENT, ERR_NO_EXEC_PERM, ex
556                                                        .getMessage(), ex);
557                                            }
558                                            else {
559                                                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_UNKNOWN_ERROR, ex
560                                                        .getMessage(), ex);
561                                            }
562                                        }
563                                    }
564                                }
565                            }
566                        }
567                    }
568                }
569            } // Any other type of exception
570            catch (Exception ex) {
571                throw convertException(ex);
572            }
573        }
574    
575        /**
576         * Checks whether the system is configured to always use the oozie user for ssh, and injects the user if required.
577         *
578         * @param host the host string.
579         * @param context the execution context.
580         * @return the modified host string with a user parameter added on if required.
581         * @throws ActionExecutorException in case the flag to use the oozie user is turned on and there is a mismatch
582         * between the user specified in the host and the oozie user.
583         */
584        private String prepareUserHost(String host, Context context) throws ActionExecutorException {
585            String oozieUser = context.getProtoActionConf().get(OozieClient.USER_NAME);
586            if (allowSshUserAtHost) {
587                if (!host.contains("@")) {
588                    host = oozieUser + "@" + host;
589                }
590            }
591            else {
592                if (host.contains("@")) {
593                    if (!host.toLowerCase().startsWith(oozieUser + "@")) {
594                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, ERR_USER_MISMATCH,
595                                                          XLog.format("user mismatch between oozie user [{0}] and ssh host [{1}]", oozieUser, host));
596                    }
597                }
598                else {
599                    host = oozieUser + "@" + host;
600                }
601            }
602            return host;
603        }
604    
605        public boolean isCompleted(String externalStatus) {
606            return true;
607        }
608    
609        /**
610         * Truncate the string to max length.
611         *
612         * @param strBuffer
613         * @return truncated string string
614         */
615        private String getTruncatedString(StringBuffer strBuffer) {
616    
617            if (strBuffer.length() <= maxLen) {
618                return strBuffer.toString();
619            }
620            else {
621                return strBuffer.substring(0, maxLen);
622            }
623        }
624    
625        /**
626         * Drains the inputStream and errorStream of the Process being executed. The contents of the streams are stored if a
627         * buffer is provided for the stream.
628         *
629         * @param p The Process instance.
630         * @param inputBuffer The buffer into which STDOUT is to be read. Can be null if only draining is required.
631         * @param errorBuffer The buffer into which STDERR is to be read. Can be null if only draining is required.
632         * @param maxLength The maximum data length to be stored in these buffers. This is an indicative value, and the
633         * store content may exceed this length.
634         * @return the exit value of the process.
635         * @throws IOException
636         */
637        private int drainBuffers(Process p, StringBuffer inputBuffer, StringBuffer errorBuffer, int maxLength)
638                throws IOException {
639            int exitValue = -1;
640            BufferedReader ir = new BufferedReader(new InputStreamReader(p.getInputStream()));
641            BufferedReader er = new BufferedReader(new InputStreamReader(p.getErrorStream()));
642    
643            int inBytesRead = 0;
644            int errBytesRead = 0;
645    
646            boolean processEnded = false;
647    
648            try {
649                while (!processEnded) {
650                    try {
651                        exitValue = p.exitValue();
652                        processEnded = true;
653                    }
654                    catch (IllegalThreadStateException ex) {
655                        // Continue to drain.
656                    }
657    
658                    inBytesRead += drainBuffer(ir, inputBuffer, maxLength, inBytesRead, processEnded);
659                    errBytesRead += drainBuffer(er, errorBuffer, maxLength, errBytesRead, processEnded);
660                }
661            }
662            finally {
663                ir.close();
664                er.close();
665            }
666            return exitValue;
667        }
668    
669        /**
670         * Reads the contents of a stream and stores them into the provided buffer.
671         *
672         * @param br The stream to be read.
673         * @param storageBuf The buffer into which the contents of the stream are to be stored.
674         * @param maxLength The maximum number of bytes to be stored in the buffer. An indicative value and may be
675         * exceeded.
676         * @param bytesRead The number of bytes read from this stream to date.
677         * @param readAll If true, the stream is drained while their is data available in it. Otherwise, only a single chunk
678         * of data is read, irrespective of how much is available.
679         * @return
680         * @throws IOException
681         */
682        private int drainBuffer(BufferedReader br, StringBuffer storageBuf, int maxLength, int bytesRead, boolean readAll)
683                throws IOException {
684            int bReadSession = 0;
685            if (br.ready()) {
686                char[] buf = new char[1024];
687                do {
688                    int bReadCurrent = br.read(buf, 0, 1024);
689                    if (storageBuf != null && bytesRead < maxLength) {
690                        storageBuf.append(buf, 0, bReadCurrent);
691                    }
692                    bReadSession += bReadCurrent;
693                } while (br.ready() && readAll);
694            }
695            return bReadSession;
696        }
697    
698        /**
699         * Returns the first line from a StringBuffer, recognized by the new line character \n.
700         *
701         * @param buffer The StringBuffer from which the first line is required.
702         * @return The first line of the buffer.
703         */
704        private String getFirstLine(StringBuffer buffer) {
705            int newLineIndex = 0;
706            newLineIndex = buffer.indexOf("\n");
707            if (newLineIndex == -1) {
708                return buffer.toString();
709            }
710            else {
711                return buffer.substring(0, newLineIndex);
712            }
713        }
714    }