This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.oozie.action.hadoop;
020
021 import java.io.BufferedReader;
022 import java.io.BufferedWriter;
023 import java.io.File;
024 import java.io.FileWriter;
025 import java.io.IOException;
026 import java.io.InputStreamReader;
027 import java.util.ArrayList;
028 import java.util.HashSet;
029 import java.util.List;
030 import java.util.Map;
031
032 import org.apache.hadoop.conf.Configuration;
033 import org.apache.hadoop.fs.Path;
034
035 public class ShellMain extends LauncherMain {
036 public static final String CONF_OOZIE_SHELL_ARGS = "oozie.shell.args";
037 public static final String CONF_OOZIE_SHELL_EXEC = "oozie.shell.exec";
038 public static final String CONF_OOZIE_SHELL_ENVS = "oozie.shell.envs";
039 public static final String CONF_OOZIE_SHELL_CAPTURE_OUTPUT = "oozie.shell.capture-output";
040 public static final String OOZIE_ACTION_CONF_XML = "OOZIE_ACTION_CONF_XML";
041
042 /**
043 * @param args Invoked from LauncherMapper:map()
044 * @throws Exception
045 */
046 public static void main(String[] args) throws Exception {
047 run(ShellMain.class, args);
048 }
049
050 @Override
051 protected void run(String[] args) throws Exception {
052
053 Configuration actionConf = loadActionConf();
054
055 int exitCode = execute(actionConf);
056 if (exitCode != 0) {
057 // Shell command failed. therefore make the action failed
058 throw new LauncherMainException(1);
059 }
060
061 }
062
063 /**
064 * Execute the shell command
065 *
066 * @param actionConf
067 * @return command exit value
068 * @throws IOException
069 */
070 private int execute(Configuration actionConf) throws Exception {
071 String exec = getExec(actionConf);
072 List<String> args = getShellArguments(actionConf);
073 ArrayList<String> cmdArray = getCmdList(exec, args.toArray(new String[args.size()]));
074 ProcessBuilder builder = new ProcessBuilder(cmdArray);
075 Map<String, String> envp = getEnvMap(builder.environment(), actionConf);
076
077 // Getting the Ccurrent working dir and setting it to processbuilder
078 File currDir = new File("dummy").getAbsoluteFile().getParentFile();
079 System.out.println("Current working dir " + currDir);
080 builder.directory(currDir);
081
082 printCommand(cmdArray, envp); // For debugging purpose
083
084 System.out.println("=================================================================");
085 System.out.println();
086 System.out.println(">>> Invoking Shell command line now >>");
087 System.out.println();
088 System.out.flush();
089
090 boolean captureOutput = actionConf.getBoolean(CONF_OOZIE_SHELL_CAPTURE_OUTPUT, false);
091
092 // Execute the Command
093 Process p = builder.start();
094
095 Thread[] thrArray = handleShellOutput(p, captureOutput);
096
097 int exitValue = p.waitFor();
098 // Wait for both the thread to exit
099 if (thrArray != null) {
100 for (Thread thr : thrArray) {
101 thr.join();
102 }
103 }
104 System.out.println("Exit code of the Shell command " + exitValue);
105 System.out.println("<<< Invocation of Shell command completed <<<");
106 System.out.println();
107 return exitValue;
108 }
109
110 /**
111 * Return the environment variable to pass to in shell command execution.
112 *
113 */
114 private Map<String, String> getEnvMap(Map<String, String> envp, Configuration actionConf) {
115 // Adding user-specified environments
116 String[] envs = MapReduceMain.getStrings(actionConf, CONF_OOZIE_SHELL_ENVS);
117 for (String env : envs) {
118 String[] varValue = env.split("=",2); // Error case is handled in
119 // ShellActionExecutor
120 envp.put(varValue[0], varValue[1]);
121 }
122 // Adding action.xml to env
123 envp.put(OOZIE_ACTION_CONF_XML, System.getProperty("oozie.action.conf.xml", ""));
124 return envp;
125 }
126
127 /**
128 * Get the shell commands with the arguments
129 *
130 * @param exec
131 * @param args
132 * @return command and list of args
133 */
134 private ArrayList<String> getCmdList(String exec, String[] args) {
135 ArrayList<String> cmdArray = new ArrayList<String>();
136 cmdArray.add(exec); // Main executable
137 for (String arg : args) { // Adding rest of the arguments
138 cmdArray.add(arg);
139 }
140 return cmdArray;
141 }
142
143 /**
144 * Print the output written by the Shell execution in its stdout/stderr.
145 * Also write the stdout output to a file for capturing.
146 *
147 * @param p process
148 * @param captureOutput indicates if STDOUT should be captured or not.
149 * @return Array of threads (one for stdout and another one for stderr
150 * processing
151 * @throws IOException thrown if an IO error occurrs.
152 */
153 protected Thread[] handleShellOutput(Process p, boolean captureOutput)
154 throws IOException {
155 BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
156 BufferedReader error = new BufferedReader(new InputStreamReader(p.getErrorStream()));
157
158 OutputWriteThread thrStdout = new OutputWriteThread(input, true, captureOutput);
159 thrStdout.setDaemon(true);
160 thrStdout.start();
161
162 OutputWriteThread thrStderr = new OutputWriteThread(error, false, false);
163 thrStderr.setDaemon(true);
164 thrStderr.start();
165
166 return new Thread[]{ thrStdout, thrStderr };
167 }
168
169 /**
170 * Thread to write output to LM stdout/stderr. Also write the content for
171 * capture-output.
172 */
173 class OutputWriteThread extends Thread {
174 BufferedReader reader = null;
175 boolean isStdout = false;
176 boolean needCaptured = false;
177
178 public OutputWriteThread(BufferedReader reader, boolean isStdout, boolean needCaptured) {
179 this.reader = reader;
180 this.isStdout = isStdout;
181 this.needCaptured = needCaptured;
182 }
183
184 @Override
185 public void run() {
186 String line;
187 BufferedWriter os = null;
188
189 try {
190 if (needCaptured) {
191 File file = new File(System.getProperty("oozie.action.output.properties"));
192 os = new BufferedWriter(new FileWriter(file));
193 }
194 while ((line = reader.readLine()) != null) {
195 if (isStdout) { // For stdout
196 // 1. Writing to LM STDOUT
197 System.out.println("Stdoutput " + line);
198 // 2. Writing for capture output
199 if (os != null) {
200 os.write(line);
201 os.newLine();
202 }
203 }
204 else {
205 System.err.println(line); // 1. Writing to LM STDERR
206 }
207 }
208 }
209 catch (IOException e) {
210 e.printStackTrace();
211 throw new RuntimeException("Stdout/Stderr read/write error :" + e);
212 }finally {
213 try {
214 reader.close();
215 }
216 catch (IOException ex) {
217 //NOP ignoring error on close of STDOUT/STDERR
218 }
219 if (os != null) {
220 try {
221 os.close();
222 }
223 catch (IOException e) {
224 e.printStackTrace();
225 throw new RuntimeException("Unable to close the file stream :" + e);
226 }
227 }
228 }
229 }
230 }
231
232 /**
233 * Print the command including the arguments as well as the environment
234 * setup
235 *
236 * @param cmdArray :Command Array
237 * @param envp :Environment array
238 */
239 protected void printCommand(ArrayList<String> cmdArray, Map<String, String> envp) {
240 int i = 0;
241 System.out.println("Full Command .. ");
242 System.out.println("-------------------------");
243 for (String arg : cmdArray) {
244 System.out.println(i++ + ":" + arg + ":");
245 }
246
247 if (envp != null) {
248 System.out.println("List of passing environment");
249 System.out.println("-------------------------");
250 for (Map.Entry<String, String> entry : envp.entrySet()) {
251 System.out.println(entry.getKey() + "=" + entry.getValue() + ":");
252 }
253 }
254
255 }
256
257 /**
258 * Retrieve the list of arguments that were originally specified to
259 * Workflow.xml.
260 *
261 * @param actionConf
262 * @return argument list
263 */
264 protected List<String> getShellArguments(Configuration actionConf) {
265 List<String> arguments = new ArrayList<String>();
266 String[] scrArgs = MapReduceMain.getStrings(actionConf, CONF_OOZIE_SHELL_ARGS);
267 for (String scrArg : scrArgs) {
268 arguments.add(scrArg);
269 }
270 return arguments;
271 }
272
273 /**
274 * Retrieve the executable name that was originally specified to
275 * Workflow.xml.
276 *
277 * @param actionConf
278 * @return executable
279 */
280 protected String getExec(Configuration actionConf) {
281 String exec = actionConf.get(CONF_OOZIE_SHELL_EXEC);
282
283 if (exec == null) {
284 throw new RuntimeException("Action Configuration does not have " + CONF_OOZIE_SHELL_EXEC + " property");
285 }
286 return exec;
287 }
288
289 /**
290 * Read action configuration passes through action xml file.
291 *
292 * @return action Configuration
293 * @throws IOException
294 */
295 protected Configuration loadActionConf() throws IOException {
296 System.out.println();
297 System.out.println("Oozie Shell action configuration");
298 System.out.println("=================================================================");
299
300 // loading action conf prepared by Oozie
301 Configuration actionConf = new Configuration(false);
302
303 String actionXml = System.getProperty("oozie.action.conf.xml");
304
305 if (actionXml == null) {
306 throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]");
307 }
308 if (!new File(actionXml).exists()) {
309 throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist");
310 }
311
312 actionConf.addResource(new Path("file:///", actionXml));
313 logMasking("Shell configuration:", new HashSet<String>(), actionConf);
314 return actionConf;
315 }
316 }