001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.oozie.action.hadoop;
020    
021    import java.io.BufferedReader;
022    import java.io.BufferedWriter;
023    import java.io.File;
024    import java.io.FileWriter;
025    import java.io.IOException;
026    import java.io.InputStreamReader;
027    import java.util.ArrayList;
028    import java.util.HashSet;
029    import java.util.List;
030    import java.util.Map;
031    
032    import org.apache.hadoop.conf.Configuration;
033    import org.apache.hadoop.fs.Path;
034    
035    public class ShellMain extends LauncherMain {
036        public static final String CONF_OOZIE_SHELL_ARGS = "oozie.shell.args";
037        public static final String CONF_OOZIE_SHELL_EXEC = "oozie.shell.exec";
038        public static final String CONF_OOZIE_SHELL_ENVS = "oozie.shell.envs";
039        public static final String CONF_OOZIE_SHELL_CAPTURE_OUTPUT = "oozie.shell.capture-output";
040        public static final String OOZIE_ACTION_CONF_XML = "OOZIE_ACTION_CONF_XML";
041    
042        /**
043         * @param args Invoked from LauncherMapper:map()
044         * @throws Exception
045         */
046        public static void main(String[] args) throws Exception {
047            run(ShellMain.class, args);
048        }
049    
050        @Override
051        protected void run(String[] args) throws Exception {
052    
053            Configuration actionConf = loadActionConf();
054    
055            int exitCode = execute(actionConf);
056            if (exitCode != 0) {
057                // Shell command failed. therefore make the action failed
058                throw new LauncherMainException(1);
059            }
060    
061        }
062    
063        /**
064         * Execute the shell command
065         *
066         * @param actionConf
067         * @return command exit value
068         * @throws IOException
069         */
070        private int execute(Configuration actionConf) throws Exception {
071            String exec = getExec(actionConf);
072            List<String> args = getShellArguments(actionConf);
073            ArrayList<String> cmdArray = getCmdList(exec, args.toArray(new String[args.size()]));
074            ProcessBuilder builder = new ProcessBuilder(cmdArray);
075            Map<String, String> envp = getEnvMap(builder.environment(), actionConf);
076    
077            // Getting the Ccurrent working dir and setting it to processbuilder
078            File currDir = new File("dummy").getAbsoluteFile().getParentFile();
079            System.out.println("Current working dir " + currDir);
080            builder.directory(currDir);
081    
082            printCommand(cmdArray, envp); // For debugging purpose
083    
084            System.out.println("=================================================================");
085            System.out.println();
086            System.out.println(">>> Invoking Shell command line now >>");
087            System.out.println();
088            System.out.flush();
089    
090            boolean captureOutput = actionConf.getBoolean(CONF_OOZIE_SHELL_CAPTURE_OUTPUT, false);
091    
092            // Execute the Command
093            Process p = builder.start();
094    
095            Thread[] thrArray = handleShellOutput(p, captureOutput);
096    
097            int exitValue = p.waitFor();
098            // Wait for both the thread to exit
099            if (thrArray != null) {
100                for (Thread thr : thrArray) {
101                    thr.join();
102                }
103            }
104            System.out.println("Exit code of the Shell command " + exitValue);
105            System.out.println("<<< Invocation of Shell command completed <<<");
106            System.out.println();
107            return exitValue;
108        }
109    
110        /**
111         * Return the environment variable to pass to in shell command execution.
112         *
113         */
114        private Map<String, String> getEnvMap(Map<String, String> envp, Configuration actionConf) {
115            // Adding user-specified environments
116            String[] envs = MapReduceMain.getStrings(actionConf, CONF_OOZIE_SHELL_ENVS);
117            for (String env : envs) {
118                String[] varValue = env.split("=",2); // Error case is handled in
119                                                    // ShellActionExecutor
120                envp.put(varValue[0], varValue[1]);
121            }
122            // Adding action.xml to env
123            envp.put(OOZIE_ACTION_CONF_XML, System.getProperty("oozie.action.conf.xml", ""));
124            return envp;
125        }
126    
127        /**
128         * Get the shell commands with the arguments
129         *
130         * @param exec
131         * @param args
132         * @return command and list of args
133         */
134        private ArrayList<String> getCmdList(String exec, String[] args) {
135            ArrayList<String> cmdArray = new ArrayList<String>();
136            cmdArray.add(exec); // Main executable
137            for (String arg : args) { // Adding rest of the arguments
138                cmdArray.add(arg);
139            }
140            return cmdArray;
141        }
142    
143        /**
144         * Print the output written by the Shell execution in its stdout/stderr.
145         * Also write the stdout output to a file for capturing.
146         *
147         * @param p process
148         * @param captureOutput indicates if STDOUT should be captured or not.
149         * @return Array of threads (one for stdout and another one for stderr
150         *         processing
151         * @throws IOException thrown if an IO error occurrs.
152         */
153        protected Thread[] handleShellOutput(Process p, boolean captureOutput)
154                throws IOException {
155            BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
156            BufferedReader error = new BufferedReader(new InputStreamReader(p.getErrorStream()));
157    
158            OutputWriteThread thrStdout = new OutputWriteThread(input, true, captureOutput);
159            thrStdout.setDaemon(true);
160            thrStdout.start();
161    
162            OutputWriteThread thrStderr = new OutputWriteThread(error, false, false);
163            thrStderr.setDaemon(true);
164            thrStderr.start();
165    
166            return new Thread[]{ thrStdout, thrStderr };
167        }
168    
169        /**
170         * Thread to write output to LM stdout/stderr. Also write the content for
171         * capture-output.
172         */
173        class OutputWriteThread extends Thread {
174            BufferedReader reader = null;
175            boolean isStdout = false;
176            boolean needCaptured = false;
177    
178            public OutputWriteThread(BufferedReader reader, boolean isStdout, boolean needCaptured) {
179                this.reader = reader;
180                this.isStdout = isStdout;
181                this.needCaptured = needCaptured;
182            }
183    
184            @Override
185            public void run() {
186                String line;
187                BufferedWriter os = null;
188    
189                try {
190                    if (needCaptured) {
191                        File file = new File(System.getProperty("oozie.action.output.properties"));
192                        os = new BufferedWriter(new FileWriter(file));
193                    }
194                    while ((line = reader.readLine()) != null) {
195                        if (isStdout) { // For stdout
196                            // 1. Writing to LM STDOUT
197                            System.out.println("Stdoutput " + line);
198                            // 2. Writing for capture output
199                            if (os != null) {
200                                os.write(line);
201                                os.newLine();
202                            }
203                        }
204                        else {
205                            System.err.println(line); // 1. Writing to LM STDERR
206                        }
207                    }
208                }
209                catch (IOException e) {
210                    e.printStackTrace();
211                    throw new RuntimeException("Stdout/Stderr read/write error :" + e);
212                }finally {
213                    try {
214                        reader.close();
215                    }
216                    catch (IOException ex) {
217                        //NOP ignoring error on close of STDOUT/STDERR
218                    }
219                    if (os != null) {
220                        try {
221                            os.close();
222                        }
223                        catch (IOException e) {
224                            e.printStackTrace();
225                            throw new RuntimeException("Unable to close the file stream :" + e);
226                        }
227                    }
228                }
229            }
230        }
231    
232        /**
233         * Print the command including the arguments as well as the environment
234         * setup
235         *
236         * @param cmdArray :Command Array
237         * @param envp :Environment array
238         */
239        protected void printCommand(ArrayList<String> cmdArray, Map<String, String> envp) {
240            int i = 0;
241            System.out.println("Full Command .. ");
242            System.out.println("-------------------------");
243            for (String arg : cmdArray) {
244                System.out.println(i++ + ":" + arg + ":");
245            }
246    
247            if (envp != null) {
248                System.out.println("List of passing environment");
249                System.out.println("-------------------------");
250                for (Map.Entry<String, String> entry : envp.entrySet()) {
251                    System.out.println(entry.getKey() + "=" + entry.getValue() + ":");
252                }
253            }
254    
255        }
256    
257        /**
258         * Retrieve the list of arguments that were originally specified to
259         * Workflow.xml.
260         *
261         * @param actionConf
262         * @return argument list
263         */
264        protected List<String> getShellArguments(Configuration actionConf) {
265            List<String> arguments = new ArrayList<String>();
266            String[] scrArgs = MapReduceMain.getStrings(actionConf, CONF_OOZIE_SHELL_ARGS);
267            for (String scrArg : scrArgs) {
268                arguments.add(scrArg);
269            }
270            return arguments;
271        }
272    
273        /**
274         * Retrieve the executable name that was originally specified to
275         * Workflow.xml.
276         *
277         * @param actionConf
278         * @return executable
279         */
280        protected String getExec(Configuration actionConf) {
281            String exec = actionConf.get(CONF_OOZIE_SHELL_EXEC);
282    
283            if (exec == null) {
284                throw new RuntimeException("Action Configuration does not have " + CONF_OOZIE_SHELL_EXEC + " property");
285            }
286            return exec;
287        }
288    
289        /**
290         * Read action configuration passes through action xml file.
291         *
292         * @return action  Configuration
293         * @throws IOException
294         */
295        protected Configuration loadActionConf() throws IOException {
296            System.out.println();
297            System.out.println("Oozie Shell action configuration");
298            System.out.println("=================================================================");
299    
300            // loading action conf prepared by Oozie
301            Configuration actionConf = new Configuration(false);
302    
303            String actionXml = System.getProperty("oozie.action.conf.xml");
304    
305            if (actionXml == null) {
306                throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]");
307            }
308            if (!new File(actionXml).exists()) {
309                throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist");
310            }
311    
312            actionConf.addResource(new Path("file:///", actionXml));
313            logMasking("Shell configuration:", new HashSet<String>(), actionConf);
314            return actionConf;
315        }
316    }