This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import java.io.BufferedReader;
021 import java.io.ByteArrayOutputStream;
022 import java.io.File;
023 import java.io.FileNotFoundException;
024 import java.io.IOException;
025 import java.io.InputStream;
026 import java.io.InputStreamReader;
027 import java.io.PrintStream;
028 import java.io.StringReader;
029 import java.net.ConnectException;
030 import java.net.URI;
031 import java.net.URISyntaxException;
032 import java.net.UnknownHostException;
033 import java.util.ArrayList;
034 import java.util.HashMap;
035 import java.util.HashSet;
036 import java.util.Iterator;
037 import java.util.List;
038 import java.util.Map;
039 import java.util.Properties;
040 import java.util.Set;
041 import java.util.Map.Entry;
042
043 import org.apache.hadoop.conf.Configuration;
044 import org.apache.hadoop.filecache.DistributedCache;
045 import org.apache.hadoop.fs.FileStatus;
046 import org.apache.hadoop.fs.FileSystem;
047 import org.apache.hadoop.fs.Path;
048 import org.apache.hadoop.fs.permission.AccessControlException;
049 import org.apache.hadoop.mapred.JobClient;
050 import org.apache.hadoop.mapred.JobConf;
051 import org.apache.hadoop.mapred.JobID;
052 import org.apache.hadoop.mapred.RunningJob;
053 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
054 import org.apache.hadoop.util.DiskChecker;
055 import org.apache.oozie.WorkflowActionBean;
056 import org.apache.oozie.WorkflowJobBean;
057 import org.apache.oozie.action.ActionExecutor;
058 import org.apache.oozie.action.ActionExecutorException;
059 import org.apache.oozie.client.OozieClient;
060 import org.apache.oozie.client.WorkflowAction;
061 import org.apache.oozie.service.HadoopAccessorException;
062 import org.apache.oozie.service.HadoopAccessorService;
063 import org.apache.oozie.service.Services;
064 import org.apache.oozie.service.URIHandlerService;
065 import org.apache.oozie.service.WorkflowAppService;
066 import org.apache.oozie.servlet.CallbackServlet;
067 import org.apache.oozie.util.ELEvaluator;
068 import org.apache.oozie.util.IOUtils;
069 import org.apache.oozie.util.PropertiesUtils;
070 import org.apache.oozie.util.XConfiguration;
071 import org.apache.oozie.util.XLog;
072 import org.apache.oozie.util.XmlUtils;
073 import org.jdom.Element;
074 import org.jdom.JDOMException;
075 import org.jdom.Namespace;
076 import org.apache.hadoop.security.token.Token;
077 import org.apache.hadoop.security.token.TokenIdentifier;
078
079 public class JavaActionExecutor extends ActionExecutor {
080
081 private static final String HADOOP_USER = "user.name";
082 public static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
083 public static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
084 public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
085 public static final String HADOOP_NAME_NODE = "fs.default.name";
086 private static final String HADOOP_JOB_NAME = "mapred.job.name";
087 public static final String OOZIE_COMMON_LIBDIR = "oozie";
088 public static final int MAX_EXTERNAL_STATS_SIZE_DEFAULT = Integer.MAX_VALUE;
089 private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
090 public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size";
091 public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
092 public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
093 private static final String HADOOP_YARN_UBER_MODE = "mapreduce.job.ubertask.enable";
094 public static final String OOZIE_ACTION_SHIP_LAUNCHER_JAR = "oozie.action.ship.launcher.jar";
095 private boolean useLauncherJar;
096 private static int maxActionOutputLen;
097 private static int maxExternalStatsSize;
098
099 private static final String SUCCEEDED = "SUCCEEDED";
100 private static final String KILLED = "KILLED";
101 private static final String FAILED = "FAILED";
102 private static final String FAILED_KILLED = "FAILED/KILLED";
103 private static final String RUNNING = "RUNNING";
104 protected XLog log = XLog.getLog(getClass());
105
106 static {
107 DISALLOWED_PROPERTIES.add(HADOOP_USER);
108 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER);
109 DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE);
110 DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2);
111 DISALLOWED_PROPERTIES.add(HADOOP_YARN_RM);
112 }
113
114 public JavaActionExecutor() {
115 this("java");
116 }
117
118 protected JavaActionExecutor(String type) {
119 super(type);
120 requiresNNJT = true;
121 useLauncherJar = getOozieConf().getBoolean(OOZIE_ACTION_SHIP_LAUNCHER_JAR, true);
122 }
123
124 protected String getLauncherJarName() {
125 return getType() + "-launcher.jar";
126 }
127
128 protected List<Class> getLauncherClasses() {
129 List<Class> classes = new ArrayList<Class>();
130 classes.add(LauncherMapper.class);
131 classes.add(LauncherSecurityManager.class);
132 classes.add(LauncherException.class);
133 classes.add(LauncherMainException.class);
134 classes.add(PrepareActionsDriver.class);
135 classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
136 classes.add(ActionStats.class);
137 classes.add(ActionType.class);
138 return classes;
139 }
140
141 @Override
142 public void initActionType() {
143 XLog log = XLog.getLog(getClass());
144 super.initActionType();
145 maxActionOutputLen = getOozieConf()
146 .getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
147 // TODO: Remove the below config get in a subsequent release..
148 // This other irrelevant property is only used to
149 // preserve backwards compatibility cause of a typo.
150 // See OOZIE-4.
151 getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN,
152 2 * 1024));
153 //Get the limit for the maximum allowed size of action stats
154 maxExternalStatsSize = getOozieConf().getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE, MAX_EXTERNAL_STATS_SIZE_DEFAULT);
155 maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize;
156
157 createLauncherJar();
158
159 registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001");
160 registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT,
161 "JA002");
162 registerError(DiskChecker.DiskOutOfSpaceException.class.getName(),
163 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003");
164 registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(),
165 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004");
166 registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(),
167 ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005");
168 registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, " JA006");
169 registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007");
170 registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008");
171 registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009");
172 }
173
174 public void createLauncherJar() {
175 if (useLauncherJar) {
176 try {
177 List<Class> classes = getLauncherClasses();
178 Class[] launcherClasses = classes.toArray(new Class[classes.size()]);
179 IOUtils.createJar(new File(getOozieRuntimeDir()), getLauncherJarName(), launcherClasses);
180 }
181 catch (IOException ex) {
182 throw new RuntimeException(ex);
183 }
184 catch (java.lang.NoClassDefFoundError err) {
185 ByteArrayOutputStream baos = new ByteArrayOutputStream();
186 err.printStackTrace(new PrintStream(baos));
187 log.warn(baos.toString());
188 }
189 }
190 }
191
192 /**
193 * Get the maximum allowed size of stats
194 *
195 * @return maximum size of stats
196 */
197 public static int getMaxExternalStatsSize() {
198 return maxExternalStatsSize;
199 }
200
201 static void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException {
202 for (String prop : DISALLOWED_PROPERTIES) {
203 if (conf.get(prop) != null) {
204 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010",
205 "Property [{0}] not allowed in action [{1}] configuration", prop, confName);
206 }
207 }
208 }
209
210 public JobConf createBaseHadoopConf(Context context, Element actionXml) {
211 Namespace ns = actionXml.getNamespace();
212 String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim();
213 String nameNode = actionXml.getChild("name-node", ns).getTextTrim();
214 JobConf conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
215 conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER));
216 conf.set(HADOOP_JOB_TRACKER, jobTracker);
217 conf.set(HADOOP_JOB_TRACKER_2, jobTracker);
218 conf.set(HADOOP_YARN_RM, jobTracker);
219 conf.set(HADOOP_NAME_NODE, nameNode);
220 conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true");
221 return conf;
222 }
223
224 private void injectLauncherProperties(Configuration srcConf, Configuration launcherConf) {
225 for (Map.Entry<String, String> entry : srcConf) {
226 if (entry.getKey().startsWith("oozie.launcher.")) {
227 String name = entry.getKey().substring("oozie.launcher.".length());
228 String value = entry.getValue();
229 // setting original KEY
230 launcherConf.set(entry.getKey(), value);
231 // setting un-prefixed key (to allow Hadoop job config
232 // for the launcher job
233 launcherConf.set(name, value);
234 }
235 }
236 }
237
238 Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
239 throws ActionExecutorException {
240 try {
241 Namespace ns = actionXml.getNamespace();
242 Element e = actionXml.getChild("configuration", ns);
243 if (e != null) {
244 String strConf = XmlUtils.prettyPrint(e).toString();
245 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
246
247 XConfiguration launcherConf = new XConfiguration();
248 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
249 XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType());
250 injectLauncherProperties(actionDefaultConf, launcherConf);
251 injectLauncherProperties(inlineConf, launcherConf);
252 injectLauncherUseUberMode(launcherConf);
253 checkForDisallowedProps(launcherConf, "launcher configuration");
254 XConfiguration.copy(launcherConf, conf);
255 }
256 return conf;
257 }
258 catch (IOException ex) {
259 throw convertException(ex);
260 }
261 }
262
263 void injectLauncherUseUberMode(Configuration launcherConf) {
264 // Set Uber Mode for the launcher (YARN only, ignored by MR1) if not set by action conf and not disabled in oozie-site
265 if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) {
266 if (getOozieConf().getBoolean("oozie.action.launcher.mapreduce.job.ubertask.enable", false)) {
267 launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
268 }
269 }
270 }
271
272 public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf)
273 throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException {
274 Namespace ns = element.getNamespace();
275 Iterator<Element> it = element.getChildren("job-xml", ns).iterator();
276 while (it.hasNext()) {
277 Element e = it.next();
278 String jobXml = e.getTextTrim();
279 Path path = new Path(appPath, jobXml);
280 FileSystem fs = context.getAppFileSystem();
281 Configuration jobXmlConf = new XConfiguration(fs.open(path));
282 checkForDisallowedProps(jobXmlConf, "job-xml");
283 XConfiguration.copy(jobXmlConf, conf);
284 }
285 Element e = element.getChild("configuration", ns);
286 if (e != null) {
287 String strConf = XmlUtils.prettyPrint(e).toString();
288 XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
289 checkForDisallowedProps(inlineConf, "inline configuration");
290 XConfiguration.copy(inlineConf, conf);
291 }
292 }
293
294 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
295 throws ActionExecutorException {
296 try {
297 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
298 XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType());
299 XConfiguration.injectDefaults(actionDefaults, actionConf);
300
301 has.checkSupportedFilesystem(appPath.toUri());
302
303 parseJobXmlAndConfiguration(context, actionXml, appPath, actionConf);
304 return actionConf;
305 }
306 catch (IOException ex) {
307 throw convertException(ex);
308 }
309 catch (HadoopAccessorException ex) {
310 throw convertException(ex);
311 }
312 catch (URISyntaxException ex) {
313 throw convertException(ex);
314 }
315 }
316
317 Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive)
318 throws ActionExecutorException {
319
320 URI uri = null;
321 try {
322 uri = new URI(filePath);
323 URI baseUri = appPath.toUri();
324 if (uri.getScheme() == null) {
325 String resolvedPath = uri.getPath();
326 if (!resolvedPath.startsWith("/")) {
327 resolvedPath = baseUri.getPath() + "/" + resolvedPath;
328 }
329 uri = new URI(baseUri.getScheme(), baseUri.getAuthority(), resolvedPath, uri.getQuery(), uri.getFragment());
330 }
331 if (archive) {
332 DistributedCache.addCacheArchive(uri.normalize(), conf);
333 }
334 else {
335 String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
336 if (fileName.endsWith(".so") || fileName.contains(".so.")) { // .so files
337 uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), fileName);
338 DistributedCache.addCacheFile(uri.normalize(), conf);
339 }
340 else if (fileName.endsWith(".jar")) { // .jar files
341 if (!fileName.contains("#")) {
342 String user = conf.get("user.name");
343 Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, new Path(uri.normalize()), conf);
344 }
345 else {
346 DistributedCache.addCacheFile(uri, conf);
347 }
348 }
349 else { // regular files
350 if (!fileName.contains("#")) {
351 uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), fileName);
352 }
353 DistributedCache.addCacheFile(uri.normalize(), conf);
354 }
355 }
356 DistributedCache.createSymlink(conf);
357 return conf;
358 }
359 catch (Exception ex) {
360 XLog.getLog(getClass()).debug(
361 "Errors when add to DistributedCache. Path=" + uri.toString() + ", archive=" + archive + ", conf="
362 + XmlUtils.prettyPrint(conf).toString());
363 throw convertException(ex);
364 }
365 }
366
367 String getOozieLauncherJar(Context context) throws ActionExecutorException {
368 try {
369 return new Path(context.getActionDir(), getLauncherJarName()).toString();
370 }
371 catch (Exception ex) {
372 throw convertException(ex);
373 }
374 }
375
376 public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
377 try {
378 Path actionDir = context.getActionDir();
379 Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp");
380 if (!actionFs.exists(actionDir)) {
381 try {
382 if (useLauncherJar) {
383 actionFs.copyFromLocalFile(new Path(getOozieRuntimeDir(), getLauncherJarName()), new Path(
384 tempActionDir, getLauncherJarName()));
385 }
386 else {
387 actionFs.mkdirs(tempActionDir);
388 }
389 actionFs.rename(tempActionDir, actionDir);
390 }
391 catch (IOException ex) {
392 actionFs.delete(tempActionDir, true);
393 actionFs.delete(actionDir, true);
394 throw ex;
395 }
396 }
397 }
398 catch (Exception ex) {
399 throw convertException(ex);
400 }
401 }
402
403 void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
404 try {
405 Path actionDir = context.getActionDir();
406 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)
407 && actionFs.exists(actionDir)) {
408 actionFs.delete(actionDir, true);
409 }
410 }
411 catch (Exception ex) {
412 throw convertException(ex);
413 }
414 }
415
416 protected void addShareLib(Path appPath, Configuration conf, String[] actionShareLibNames)
417 throws ActionExecutorException {
418 if (actionShareLibNames != null) {
419 for (String actionShareLibName : actionShareLibNames) {
420 try {
421 Path systemLibPath = Services.get().get(WorkflowAppService.class).getSystemLibPath();
422 if (systemLibPath != null) {
423 Path actionLibPath = new Path(systemLibPath, actionShareLibName.trim());
424 String user = conf.get("user.name");
425 FileSystem fs;
426 // If the actionLibPath has a valid scheme and authority, then use them to
427 // determine the filesystem that the sharelib resides on; otherwise, assume
428 // it resides on the same filesystem as the appPath and use the appPath to
429 // determine the filesystem
430 if (actionLibPath.toUri().getScheme() != null && actionLibPath.toUri().getAuthority() != null) {
431 fs = Services.get().get(HadoopAccessorService.class)
432 .createFileSystem(user, actionLibPath.toUri(), conf);
433 }
434 else {
435 fs = Services.get().get(HadoopAccessorService.class)
436 .createFileSystem(user, appPath.toUri(), conf);
437 }
438 if (fs.exists(actionLibPath)) {
439 FileStatus[] files = fs.listStatus(actionLibPath);
440 for (FileStatus file : files) {
441 addToCache(conf, actionLibPath, file.getPath().toUri().getPath(), false);
442 }
443 }
444 }
445 }
446 catch (HadoopAccessorException ex) {
447 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, ex.getErrorCode()
448 .toString(), ex.getMessage());
449 }
450 catch (IOException ex) {
451 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
452 "It should never happen", ex.getMessage());
453 }
454 }
455 }
456 }
457
458 protected void addActionLibs(Path appPath, Configuration conf) throws ActionExecutorException {
459 String[] actionLibsStrArr = conf.getStrings("oozie.launcher.oozie.libpath");
460 if (actionLibsStrArr != null) {
461 try {
462 for (String actionLibsStr : actionLibsStrArr) {
463 actionLibsStr = actionLibsStr.trim();
464 if (actionLibsStr.length() > 0)
465 {
466 Path actionLibsPath = new Path(actionLibsStr);
467 String user = conf.get("user.name");
468 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf);
469 if (fs.exists(actionLibsPath)) {
470 FileStatus[] files = fs.listStatus(actionLibsPath);
471 for (FileStatus file : files) {
472 addToCache(conf, appPath, file.getPath().toUri().getPath(), false);
473 }
474 }
475 }
476 }
477 }
478 catch (HadoopAccessorException ex){
479 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
480 ex.getErrorCode().toString(), ex.getMessage());
481 }
482 catch (IOException ex){
483 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
484 "It should never happen", ex.getMessage());
485 }
486 }
487 }
488
489 @SuppressWarnings("unchecked")
490 void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf)
491 throws ActionExecutorException {
492 Configuration proto = context.getProtoActionConf();
493
494 // launcher JAR
495 if (useLauncherJar) {
496 addToCache(conf, appPath, getOozieLauncherJar(context), false);
497 }
498
499 // Workflow lib/
500 String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST);
501 if (paths != null) {
502 for (String path : paths) {
503 addToCache(conf, appPath, path, false);
504 }
505 }
506
507 // Action libs
508 addActionLibs(appPath, conf);
509
510 // files and archives defined in the action
511 for (Element eProp : (List<Element>) actionXml.getChildren()) {
512 if (eProp.getName().equals("file")) {
513 String[] filePaths = eProp.getTextTrim().split(",");
514 for (String path : filePaths) {
515 addToCache(conf, appPath, path.trim(), false);
516 }
517 }
518 else if (eProp.getName().equals("archive")) {
519 String[] archivePaths = eProp.getTextTrim().split(",");
520 for (String path : archivePaths){
521 addToCache(conf, appPath, path.trim(), true);
522 }
523 }
524 }
525
526 addAllShareLibs(appPath, conf, context, actionXml);
527 }
528
529 // Adds action specific share libs and common share libs
530 private void addAllShareLibs(Path appPath, Configuration conf, Context context, Element actionXml)
531 throws ActionExecutorException {
532 // Add action specific share libs
533 addActionShareLib(appPath, conf, context, actionXml);
534 // Add common sharelibs for Oozie
535 addShareLib(appPath, conf, new String[]{JavaActionExecutor.OOZIE_COMMON_LIBDIR});
536 }
537
538 private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml)
539 throws ActionExecutorException {
540 XConfiguration wfJobConf = null;
541 try {
542 wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
543 }
544 catch (IOException ioe) {
545 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
546 ioe.getMessage());
547 }
548 // Action sharelibs are only added if user has specified to use system libpath
549 if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) {
550 // add action specific sharelibs
551 addShareLib(appPath, conf, getShareLibNames(context, actionXml, conf));
552 }
553 }
554
555
556 protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
557 Namespace ns = actionXml.getNamespace();
558 Element e = actionXml.getChild("main-class", ns);
559 return e.getTextTrim();
560 }
561
562 private static final String QUEUE_NAME = "mapred.job.queue.name";
563
564 private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>();
565
566 static {
567 SPECIAL_PROPERTIES.add(QUEUE_NAME);
568 SPECIAL_PROPERTIES.add(ACL_VIEW_JOB);
569 SPECIAL_PROPERTIES.add(ACL_MODIFY_JOB);
570 }
571
572 @SuppressWarnings("unchecked")
573 JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf)
574 throws ActionExecutorException {
575 try {
576
577 // app path could be a file
578 Path appPathRoot = new Path(context.getWorkflow().getAppPath());
579 if (actionFs.isFile(appPathRoot)) {
580 appPathRoot = appPathRoot.getParent();
581 }
582
583 // launcher job configuration
584 JobConf launcherJobConf = createBaseHadoopConf(context, actionXml);
585 setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context);
586
587 String actionShareLibProperty = actionConf.get(ACTION_SHARELIB_FOR + getType());
588 if (actionShareLibProperty != null) {
589 launcherJobConf.set(ACTION_SHARELIB_FOR + getType(), actionShareLibProperty);
590 }
591 setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf);
592
593 String jobName = launcherJobConf.get(HADOOP_JOB_NAME);
594 if (jobName == null || jobName.isEmpty()) {
595 jobName = XLog.format(
596 "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(),
597 context.getWorkflow().getAppName(), action.getName(),
598 context.getWorkflow().getId());
599 launcherJobConf.setJobName(jobName);
600 }
601
602 String jobId = context.getWorkflow().getId();
603 String actionId = action.getId();
604 Path actionDir = context.getActionDir();
605 String recoveryId = context.getRecoveryId();
606
607 // Getting the prepare XML from the action XML
608 Namespace ns = actionXml.getNamespace();
609 Element prepareElement = actionXml.getChild("prepare", ns);
610 String prepareXML = "";
611 if (prepareElement != null) {
612 if (prepareElement.getChildren().size() > 0) {
613 prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim();
614 }
615 }
616 LauncherMapperHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf,
617 prepareXML);
618
619 LauncherMapperHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
620 LauncherMapperHelper.setupLauncherURIHandlerConf(launcherJobConf);
621 LauncherMapperHelper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
622 LauncherMapperHelper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
623
624 List<Element> list = actionXml.getChildren("arg", ns);
625 String[] args = new String[list.size()];
626 for (int i = 0; i < list.size(); i++) {
627 args[i] = list.get(i).getTextTrim();
628 }
629 LauncherMapperHelper.setupMainArguments(launcherJobConf, args);
630
631 List<Element> javaopts = actionXml.getChildren("java-opt", ns);
632 for (Element opt: javaopts) {
633 String opts = launcherJobConf.get("mapred.child.java.opts", "");
634 opts = opts + " " + opt.getTextTrim();
635 opts = opts.trim();
636 launcherJobConf.set("mapred.child.java.opts", opts);
637 }
638
639 Element opt = actionXml.getChild("java-opts", ns);
640 if (opt != null) {
641 String opts = launcherJobConf.get("mapred.child.java.opts", "");
642 opts = opts + " " + opt.getTextTrim();
643 opts = opts.trim();
644 launcherJobConf.set("mapred.child.java.opts", opts);
645 }
646
647 // properties from action that are needed by the launcher (e.g. QUEUE NAME, ACLs)
648 // maybe we should add queue to the WF schema, below job-tracker
649 actionConfToLauncherConf(actionConf, launcherJobConf);
650
651 // to disable cancelation of delegation token on launcher job end
652 launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
653
654 return launcherJobConf;
655 }
656 catch (Exception ex) {
657 throw convertException(ex);
658 }
659 }
660
661 private void injectCallback(Context context, Configuration conf) {
662 String callback = context.getCallbackUrl("$jobStatus");
663 if (conf.get("job.end.notification.url") != null) {
664 XLog.getLog(getClass()).warn("Overriding the action job end notification URI");
665 }
666 conf.set("job.end.notification.url", callback);
667 }
668
669 void injectActionCallback(Context context, Configuration actionConf) {
670 injectCallback(context, actionConf);
671 }
672
673 void injectLauncherCallback(Context context, Configuration launcherConf) {
674 injectCallback(context, launcherConf);
675 }
676
677 private void actionConfToLauncherConf(Configuration actionConf, JobConf launcherConf) {
678 for (String name : SPECIAL_PROPERTIES) {
679 if (actionConf.get(name) != null && launcherConf.get("oozie.launcher." + name) == null) {
680 launcherConf.set(name, actionConf.get(name));
681 }
682 }
683 }
684
685 public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
686 JobClient jobClient = null;
687 boolean exception = false;
688 try {
689 Path appPathRoot = new Path(context.getWorkflow().getAppPath());
690
691 // app path could be a file
692 if (actionFs.isFile(appPathRoot)) {
693 appPathRoot = appPathRoot.getParent();
694 }
695
696 Element actionXml = XmlUtils.parseXml(action.getConf());
697
698 // action job configuration
699 Configuration actionConf = createBaseHadoopConf(context, actionXml);
700 setupActionConf(actionConf, context, actionXml, appPathRoot);
701 XLog.getLog(getClass()).debug("Setting LibFilesArchives ");
702 setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
703
704 String jobName = actionConf.get(HADOOP_JOB_NAME);
705 if (jobName == null || jobName.isEmpty()) {
706 jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}",
707 getType(), context.getWorkflow().getAppName(),
708 action.getName(), context.getWorkflow().getId());
709 actionConf.set(HADOOP_JOB_NAME, jobName);
710 }
711
712 injectActionCallback(context, actionConf);
713
714 if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) {
715 // ONLY in the case where user has not given the
716 // modify-job ACL specifically
717 if (context.getWorkflow().getAcl() != null) {
718 // setting the group owning the Oozie job to allow anybody in that
719 // group to modify the jobs.
720 actionConf.set(ACL_MODIFY_JOB, context.getWorkflow().getAcl());
721 }
722 }
723
724 // Setting the credential properties in launcher conf
725 HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context,
726 action, actionConf);
727
728 // Adding if action need to set more credential tokens
729 JobConf credentialsConf = new JobConf(false);
730 XConfiguration.copy(actionConf, credentialsConf);
731 setCredentialTokens(credentialsConf, context, action, credentialsProperties);
732
733 // insert conf to action conf from credentialsConf
734 for (Entry<String, String> entry : credentialsConf) {
735 if (actionConf.get(entry.getKey()) == null) {
736 actionConf.set(entry.getKey(), entry.getValue());
737 }
738 }
739
740 JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
741 injectLauncherCallback(context, launcherJobConf);
742 XLog.getLog(getClass()).debug("Creating Job Client for action " + action.getId());
743 jobClient = createJobClient(context, launcherJobConf);
744 String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
745 .getRecoveryId());
746 boolean alreadyRunning = launcherId != null;
747 RunningJob runningJob;
748
749 // if user-retry is on, always submit new launcher
750 boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
751
752 if (alreadyRunning && !isUserRetry) {
753 runningJob = jobClient.getJob(JobID.forName(launcherId));
754 if (runningJob == null) {
755 String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
756 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
757 "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker);
758 }
759 }
760 else {
761 XLog.getLog(getClass()).debug("Submitting the job through Job Client for action " + action.getId());
762
763 // setting up propagation of the delegation token.
764 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
765 Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(has
766 .getMRDelegationTokenRenewer(launcherJobConf));
767 launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt);
768
769 // insert credentials tokens to launcher job conf if needed
770 if (needInjectCredentials()) {
771 for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) {
772 log.debug("ADDING TOKEN: " + tk.getKind().toString());
773 launcherJobConf.getCredentials().addToken(tk.getKind(), tk);
774 }
775 }
776 else {
777 log.info("No need to inject credentials.");
778 }
779 runningJob = jobClient.submitJob(launcherJobConf);
780 if (runningJob == null) {
781 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
782 "Error submitting launcher for action [{0}]", action.getId());
783 }
784 launcherId = runningJob.getID().toString();
785 XLog.getLog(getClass()).debug("After submission get the launcherId " + launcherId);
786 }
787
788 String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
789 String consoleUrl = runningJob.getTrackingURL();
790 context.setStartData(launcherId, jobTracker, consoleUrl);
791 }
792 catch (Exception ex) {
793 exception = true;
794 throw convertException(ex);
795 }
796 finally {
797 if (jobClient != null) {
798 try {
799 jobClient.close();
800 }
801 catch (Exception e) {
802 if (exception) {
803 log.error("JobClient error: ", e);
804 }
805 else {
806 throw convertException(e);
807 }
808 }
809 }
810 }
811 }
812
813 private boolean needInjectCredentials() {
814 boolean methodExists = true;
815
816 Class klass;
817 try {
818 klass = Class.forName("org.apache.hadoop.mapred.JobConf");
819 klass.getMethod("getCredentials");
820 }
821 catch (ClassNotFoundException ex) {
822 methodExists = false;
823 }
824 catch (NoSuchMethodException ex) {
825 methodExists = false;
826 }
827
828 return methodExists;
829 }
830
831 protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context,
832 WorkflowAction action, Configuration actionConf) throws Exception {
833 HashMap<String, CredentialsProperties> credPropertiesMap = null;
834 if (context != null && action != null) {
835 credPropertiesMap = getActionCredentialsProperties(context, action);
836 if (credPropertiesMap != null) {
837 for (String key : credPropertiesMap.keySet()) {
838 CredentialsProperties prop = credPropertiesMap.get(key);
839 if (prop != null) {
840 log.debug("Credential Properties set for action : " + action.getId());
841 for (String property : prop.getProperties().keySet()) {
842 actionConf.set(property, prop.getProperties().get(property));
843 log.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) + "'");
844 }
845 }
846 }
847 }
848 else {
849 log.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred());
850 }
851 }
852 else {
853 log.warn("context or action is null");
854 }
855 return credPropertiesMap;
856 }
857
858 protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action,
859 HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception {
860
861 if (context != null && action != null && credPropertiesMap != null) {
862 for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
863 String credName = entry.getKey();
864 CredentialsProperties credProps = entry.getValue();
865 if (credProps != null) {
866 CredentialsProvider credProvider = new CredentialsProvider(credProps.getType());
867 Credentials credentialObject = credProvider.createCredentialObject();
868 if (credentialObject != null) {
869 credentialObject.addtoJobConf(jobconf, credProps, context);
870 log.debug("Retrieved Credential '" + credName + "' for action " + action.getId());
871 }
872 else {
873 log.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType());
874 }
875 }
876 }
877 }
878
879 }
880
881 protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context,
882 WorkflowAction action) throws Exception {
883 HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>();
884 if (context != null && action != null) {
885 String credsInAction = action.getCred();
886 log.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId());
887 String[] credNames = credsInAction.split(",");
888 for (String credName : credNames) {
889 CredentialsProperties credProps = getCredProperties(context, credName);
890 props.put(credName, credProps);
891 }
892 }
893 else {
894 log.warn("context or action is null");
895 }
896 return props;
897 }
898
899 @SuppressWarnings("unchecked")
900 protected CredentialsProperties getCredProperties(Context context, String credName)
901 throws Exception {
902 CredentialsProperties credProp = null;
903 String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition();
904 XConfiguration wfjobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
905 Element elementJob = XmlUtils.parseXml(workflowXml);
906 Element credentials = elementJob.getChild("credentials", elementJob.getNamespace());
907 if (credentials != null) {
908 for (Element credential : (List<Element>) credentials.getChildren("credential", credentials.getNamespace())) {
909 String name = credential.getAttributeValue("name");
910 String type = credential.getAttributeValue("type");
911 log.debug("getCredProperties: Name: " + name + ", Type: " + type);
912 if (name.equalsIgnoreCase(credName)) {
913 credProp = new CredentialsProperties(name, type);
914 for (Element property : (List<Element>) credential.getChildren("property",
915 credential.getNamespace())) {
916 String propertyName = property.getChildText("name", property.getNamespace());
917 String propertyValue = property.getChildText("value", property.getNamespace());
918 ELEvaluator eval = new ELEvaluator();
919 for (Map.Entry<String, String> entry : wfjobConf) {
920 eval.setVariable(entry.getKey(), entry.getValue().trim());
921 }
922 propertyName = eval.evaluate(propertyName, String.class);
923 propertyValue = eval.evaluate(propertyValue, String.class);
924
925 credProp.getProperties().put(propertyName, propertyValue);
926 log.debug("getCredProperties: Properties name :'" + propertyName + "', Value : '"
927 + propertyValue + "'");
928 }
929 }
930 }
931 } else {
932 log.warn("credentials is null for the action");
933 }
934 return credProp;
935 }
936
937 @Override
938 public void start(Context context, WorkflowAction action) throws ActionExecutorException {
939 try {
940 XLog.getLog(getClass()).debug("Starting action " + action.getId() + " getting Action File System");
941 FileSystem actionFs = context.getAppFileSystem();
942 XLog.getLog(getClass()).debug("Preparing action Dir through copying " + context.getActionDir());
943 prepareActionDir(actionFs, context);
944 XLog.getLog(getClass()).debug("Action Dir is ready. Submitting the action ");
945 submitLauncher(actionFs, context, action);
946 XLog.getLog(getClass()).debug("Action submit completed. Performing check ");
947 check(context, action);
948 XLog.getLog(getClass()).debug("Action check is done after submission");
949 }
950 catch (Exception ex) {
951 throw convertException(ex);
952 }
953 }
954
955 @Override
956 public void end(Context context, WorkflowAction action) throws ActionExecutorException {
957 try {
958 String externalStatus = action.getExternalStatus();
959 WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK
960 : WorkflowAction.Status.ERROR;
961 context.setEndData(status, getActionSignal(status));
962 }
963 catch (Exception ex) {
964 throw convertException(ex);
965 }
966 finally {
967 try {
968 FileSystem actionFs = context.getAppFileSystem();
969 cleanUpActionDir(actionFs, context);
970 }
971 catch (Exception ex) {
972 throw convertException(ex);
973 }
974 }
975 }
976
977 /**
978 * Create job client object
979 *
980 * @param context
981 * @param jobConf
982 * @return
983 * @throws HadoopAccessorException
984 */
985 protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException {
986 String user = context.getWorkflow().getUser();
987 String group = context.getWorkflow().getGroup();
988 return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
989 }
990
991 protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{
992 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
993 return runningJob;
994 }
995
996 @Override
997 public void check(Context context, WorkflowAction action) throws ActionExecutorException {
998 JobClient jobClient = null;
999 boolean exception = false;
1000 try {
1001 Element actionXml = XmlUtils.parseXml(action.getConf());
1002 FileSystem actionFs = context.getAppFileSystem();
1003 JobConf jobConf = createBaseHadoopConf(context, actionXml);
1004 jobClient = createJobClient(context, jobConf);
1005 RunningJob runningJob = getRunningJob(context, action, jobClient);
1006 if (runningJob == null) {
1007 context.setExternalStatus(FAILED);
1008 context.setExecutionData(FAILED, null);
1009 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
1010 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", action
1011 .getExternalId(), action.getId());
1012 }
1013 if (runningJob.isComplete()) {
1014 Path actionDir = context.getActionDir();
1015
1016 String user = context.getWorkflow().getUser();
1017 String group = context.getWorkflow().getGroup();
1018 if (LauncherMapperHelper.hasIdSwap(runningJob, user, group, actionDir)) {
1019 String launcherId = action.getExternalId();
1020 Path idSwapPath = LauncherMapperHelper.getIdSwapPath(context.getActionDir());
1021 InputStream is = actionFs.open(idSwapPath);
1022 BufferedReader reader = new BufferedReader(new InputStreamReader(is));
1023 Properties props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
1024 reader.close();
1025 String newId = props.getProperty("id");
1026 runningJob = jobClient.getJob(JobID.forName(newId));
1027 if (runningJob == null) {
1028 context.setExternalStatus(FAILED);
1029 throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
1030 "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId,
1031 action.getId());
1032 }
1033
1034 context.setExternalChildIDs(newId);
1035 XLog.getLog(getClass()).info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId,
1036 newId);
1037 }
1038 if (runningJob.isComplete()) {
1039 XLog.getLog(getClass()).info(XLog.STD, "action completed, external ID [{0}]",
1040 action.getExternalId());
1041 if (runningJob.isSuccessful() && LauncherMapperHelper.isMainSuccessful(runningJob)) {
1042 getActionData(actionFs, runningJob, action, context);
1043 XLog.getLog(getClass()).info(XLog.STD, "action produced output");
1044 }
1045 else {
1046 XLog log = XLog.getLog(getClass());
1047 String errorReason;
1048 Path actionError = LauncherMapperHelper.getErrorPath(context.getActionDir());
1049 if (actionFs.exists(actionError)) {
1050 InputStream is = actionFs.open(actionError);
1051 BufferedReader reader = new BufferedReader(new InputStreamReader(is));
1052 Properties props = PropertiesUtils.readProperties(reader, -1);
1053 reader.close();
1054 String errorCode = props.getProperty("error.code");
1055 if (errorCode.equals("0")) {
1056 errorCode = "JA018";
1057 }
1058 if (errorCode.equals("-1")) {
1059 errorCode = "JA019";
1060 }
1061 errorReason = props.getProperty("error.reason");
1062 log.warn("Launcher ERROR, reason: {0}", errorReason);
1063 String exMsg = props.getProperty("exception.message");
1064 String errorInfo = (exMsg != null) ? exMsg : errorReason;
1065 context.setErrorInfo(errorCode, errorInfo);
1066 String exStackTrace = props.getProperty("exception.stacktrace");
1067 if (exMsg != null) {
1068 log.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
1069 }
1070 }
1071 else {
1072 errorReason = XLog.format("LauncherMapper died, check Hadoop log for job [{0}:{1}]", action
1073 .getTrackerUri(), action.getExternalId());
1074 log.warn(errorReason);
1075 }
1076 context.setExecutionData(FAILED_KILLED, null);
1077 setActionCompletionData(context, actionFs);
1078 }
1079 }
1080 else {
1081 context.setExternalStatus(RUNNING);
1082 XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
1083 action.getExternalId(), action.getExternalStatus());
1084 }
1085 }
1086 else {
1087 context.setExternalStatus(RUNNING);
1088 XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
1089 action.getExternalId(), action.getExternalStatus());
1090 }
1091 }
1092 catch (Exception ex) {
1093 XLog.getLog(getClass()).warn("Exception in check(). Message[{0}]", ex.getMessage(), ex);
1094 exception = true;
1095 throw convertException(ex);
1096 }
1097 finally {
1098 if (jobClient != null) {
1099 try {
1100 jobClient.close();
1101 }
1102 catch (Exception e) {
1103 if (exception) {
1104 log.error("JobClient error: ", e);
1105 }
1106 else {
1107 throw convertException(e);
1108 }
1109 }
1110 }
1111 }
1112 }
1113
1114 /**
1115 * Get the output data of an action. Subclasses should override this method
1116 * to get action specific output data.
1117 *
1118 * @param actionFs the FileSystem object
1119 * @param runningJob the runningJob
1120 * @param action the Workflow action
1121 * @param context executor context
1122 *
1123 */
1124 protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
1125 throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
1126 Properties props = null;
1127 if (getCaptureOutput(action)) {
1128 props = new Properties();
1129 if (LauncherMapperHelper.hasOutputData(runningJob)) {
1130 Path actionOutput = LauncherMapperHelper.getOutputDataPath(context.getActionDir());
1131 InputStream is = actionFs.open(actionOutput);
1132 BufferedReader reader = new BufferedReader(new InputStreamReader(is));
1133 props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
1134 reader.close();
1135 }
1136 }
1137 context.setExecutionData(SUCCEEDED, props);
1138 }
1139
1140 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
1141 Element eConf = XmlUtils.parseXml(action.getConf());
1142 Namespace ns = eConf.getNamespace();
1143 Element captureOutput = eConf.getChild("capture-output", ns);
1144 return captureOutput != null;
1145 }
1146
1147 @Override
1148 public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
1149 JobClient jobClient = null;
1150 boolean exception = false;
1151 try {
1152 Element actionXml = XmlUtils.parseXml(action.getConf());
1153 JobConf jobConf = createBaseHadoopConf(context, actionXml);
1154 jobClient = createJobClient(context, jobConf);
1155 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
1156 if (runningJob != null) {
1157 runningJob.killJob();
1158 }
1159 context.setExternalStatus(KILLED);
1160 context.setExecutionData(KILLED, null);
1161 }
1162 catch (Exception ex) {
1163 exception = true;
1164 throw convertException(ex);
1165 }
1166 finally {
1167 try {
1168 FileSystem actionFs = context.getAppFileSystem();
1169 cleanUpActionDir(actionFs, context);
1170 if (jobClient != null) {
1171 jobClient.close();
1172 }
1173 }
1174 catch (Exception ex) {
1175 if (exception) {
1176 log.error("Error: ", ex);
1177 }
1178 else {
1179 throw convertException(ex);
1180 }
1181 }
1182 }
1183 }
1184
1185 private static Set<String> FINAL_STATUS = new HashSet<String>();
1186
1187 static {
1188 FINAL_STATUS.add(SUCCEEDED);
1189 FINAL_STATUS.add(KILLED);
1190 FINAL_STATUS.add(FAILED);
1191 FINAL_STATUS.add(FAILED_KILLED);
1192 }
1193
1194 @Override
1195 public boolean isCompleted(String externalStatus) {
1196 return FINAL_STATUS.contains(externalStatus);
1197 }
1198
1199
1200 /**
1201 * Return the sharelib names for the action.
1202 * <p/>
1203 * If <code>NULL</code> or empty, it means that the action does not use the action
1204 * sharelib.
1205 * <p/>
1206 * If a non-empty string, i.e. <code>foo</code>, it means the action uses the
1207 * action sharelib sub-directory <code>foo</code> and all JARs in the sharelib
1208 * <code>foo</code> directory will be in the action classpath. Multiple sharelib
1209 * sub-directories can be specified as a comma separated list.
1210 * <p/>
1211 * The resolution is done using the following precedence order:
1212 * <ul>
1213 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the action configuration</li>
1214 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the job configuration</li>
1215 * <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the oozie configuration</li>
1216 * <li>Action Executor <code>getDefaultShareLibName()</code> method</li>
1217 * </ul>
1218 *
1219 *
1220 * @param context executor context.
1221 * @param actionXml
1222 * @param conf action configuration.
1223 * @return the action sharelib names.
1224 */
1225 protected String[] getShareLibNames(Context context, Element actionXml, Configuration conf) {
1226 String[] names = conf.getStrings(ACTION_SHARELIB_FOR + getType());
1227 if (names == null || names.length == 0) {
1228 try {
1229 XConfiguration jobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
1230 names = jobConf.getStrings(ACTION_SHARELIB_FOR + getType());
1231 if (names == null || names.length == 0) {
1232 names = Services.get().getConf().getStrings(ACTION_SHARELIB_FOR + getType());
1233 if (names == null || names.length == 0) {
1234 String name = getDefaultShareLibName(actionXml);
1235 if (name != null) {
1236 names = new String[] { name };
1237 }
1238 }
1239 }
1240 }
1241 catch (IOException ex) {
1242 throw new RuntimeException("It cannot happen, " + ex.toString(), ex);
1243 }
1244 }
1245 return names;
1246 }
1247
1248 private final static String ACTION_SHARELIB_FOR = "oozie.action.sharelib.for.";
1249
1250
1251 /**
1252 * Returns the default sharelib name for the action if any.
1253 *
1254 * @param actionXml the action XML fragment.
1255 * @return the sharelib name for the action, <code>NULL</code> if none.
1256 */
1257 protected String getDefaultShareLibName(Element actionXml) {
1258 return null;
1259 }
1260
1261 /**
1262 * Sets some data for the action on completion
1263 *
1264 * @param context executor context
1265 * @param actionFs the FileSystem object
1266 */
1267 protected void setActionCompletionData(Context context, FileSystem actionFs) throws IOException,
1268 HadoopAccessorException, URISyntaxException {
1269 }
1270 }