001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.io.File; 022import java.io.FileInputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.OutputStream; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.net.URL; 029import java.net.URLDecoder; 030import java.text.MessageFormat; 031import java.text.ParseException; 032import java.text.SimpleDateFormat; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Calendar; 036import java.util.Comparator; 037import java.util.Date; 038import java.util.Enumeration; 039import java.util.HashMap; 040import java.util.HashSet; 041import java.util.List; 042import java.util.Map; 043import java.util.Properties; 044import java.util.Set; 045import java.util.TimeZone; 046import java.util.Map.Entry; 047import org.apache.commons.lang.StringUtils; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.fs.FileStatus; 050import org.apache.hadoop.fs.FileSystem; 051import org.apache.hadoop.fs.Path; 052import org.apache.hadoop.fs.PathFilter; 053import org.apache.hadoop.fs.permission.FsPermission; 054import org.apache.hadoop.io.IOUtils; 055import org.apache.oozie.action.ActionExecutor; 056import org.apache.oozie.action.hadoop.JavaActionExecutor; 057import org.apache.oozie.client.rest.JsonUtils; 058import org.apache.oozie.hadoop.utils.HadoopShims; 059import org.apache.oozie.util.Instrumentable; 060import org.apache.oozie.util.Instrumentation; 061import org.apache.oozie.util.XConfiguration; 062import org.apache.oozie.util.XLog; 063import com.google.common.annotations.VisibleForTesting; 064 065import org.apache.oozie.ErrorCode; 066import org.jdom.JDOMException; 067 068public class ShareLibService implements Service, Instrumentable { 069 070 public static final String LAUNCHERJAR_LIB_RETENTION = CONF_PREFIX + "ShareLibService.temp.sharelib.retention.days"; 071 072 public static final String SHARELIB_MAPPING_FILE = CONF_PREFIX + "ShareLibService.mapping.file"; 073 074 public static final String SHIP_LAUNCHER_JAR = "oozie.action.ship.launcher.jar"; 075 076 public static final String PURGE_INTERVAL = CONF_PREFIX + "ShareLibService.purge.interval"; 077 078 public static final String FAIL_FAST_ON_STARTUP = CONF_PREFIX + "ShareLibService.fail.fast.on.startup"; 079 080 private static final String PERMISSION_STRING = "-rwxr-xr-x"; 081 082 public static final String LAUNCHER_LIB_PREFIX = "launcher_"; 083 084 public static final String SHARE_LIB_PREFIX = "lib_"; 085 086 public static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); 087 088 private Services services; 089 090 private Map<String, List<Path>> shareLibMap = new HashMap<String, List<Path>>(); 091 092 private Map<String, Map<Path, Configuration>> shareLibConfigMap = new HashMap<String, Map<Path, Configuration>>(); 093 094 private Map<String, List<Path>> launcherLibMap = new HashMap<String, List<Path>>(); 095 096 private Set<String> actionConfSet = new HashSet<String>(); 097 098 // symlink mapping. Oozie keeps on checking symlink path and if changes, Oozie reloads the sharelib 099 private Map<String, Map<Path, Path>> symlinkMapping = new HashMap<String, Map<Path, Path>>(); 100 101 private static XLog LOG = XLog.getLog(ShareLibService.class); 102 103 private String sharelibMappingFile; 104 105 private boolean isShipLauncherEnabled = false; 106 107 public static String SHARE_LIB_CONF_PREFIX = "oozie"; 108 109 private boolean shareLibLoadAttempted = false; 110 111 private String sharelibMetaFileOldTimeStamp; 112 113 private String sharelibDirOld; 114 115 FileSystem fs; 116 117 final long retentionTime = 1000 * 60 * 60 * 24 * ConfigurationService.getInt(LAUNCHERJAR_LIB_RETENTION); 118 119 @Override 120 public void init(Services services) throws ServiceException { 121 this.services = services; 122 sharelibMappingFile = ConfigurationService.get(services.getConf(), SHARELIB_MAPPING_FILE); 123 isShipLauncherEnabled = ConfigurationService.getBoolean(services.getConf(), SHIP_LAUNCHER_JAR); 124 boolean failOnfailure = ConfigurationService.getBoolean(services.getConf(), FAIL_FAST_ON_STARTUP); 125 Path launcherlibPath = getLauncherlibPath(); 126 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 127 URI uri = launcherlibPath.toUri(); 128 try { 129 fs = FileSystem.get(has.createJobConf(uri.getAuthority())); 130 //cache action key sharelib conf list 131 cacheActionKeySharelibConfList(); 132 updateLauncherLib(); 133 updateShareLib(); 134 } 135 catch (Throwable e) { 136 if (failOnfailure) { 137 LOG.error("Sharelib initialization fails", e); 138 throw new ServiceException(ErrorCode.E0104, getClass().getName(), "Sharelib initialization fails. ", e); 139 } 140 else { 141 // We don't want to actually fail init by throwing an Exception, so only create the ServiceException and 142 // log it 143 ServiceException se = new ServiceException(ErrorCode.E0104, getClass().getName(), 144 "Not able to cache sharelib. An Admin needs to install the sharelib with oozie-setup.sh and issue the " 145 + "'oozie admin' CLI command to update the sharelib", e); 146 LOG.error(se); 147 } 148 } 149 Runnable purgeLibsRunnable = new Runnable() { 150 @Override 151 public void run() { 152 System.out.flush(); 153 try { 154 // Only one server should purge sharelib 155 if (Services.get().get(JobsConcurrencyService.class).isLeader()) { 156 final Date current = Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime(); 157 purgeLibs(fs, LAUNCHER_LIB_PREFIX, current); 158 purgeLibs(fs, SHARE_LIB_PREFIX, current); 159 } 160 } 161 catch (IOException e) { 162 LOG.error("There was an issue purging the sharelib", e); 163 } 164 } 165 }; 166 services.get(SchedulerService.class).schedule(purgeLibsRunnable, 10, 167 ConfigurationService.getInt(services.getConf(), PURGE_INTERVAL) * 60 * 60 * 24, 168 SchedulerService.Unit.SEC); 169 } 170 171 /** 172 * Recursively change permissions. 173 * 174 * @throws IOException Signals that an I/O exception has occurred. 175 */ 176 private void updateLauncherLib() throws IOException { 177 if (isShipLauncherEnabled) { 178 if (fs == null) { 179 Path launcherlibPath = getLauncherlibPath(); 180 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 181 URI uri = launcherlibPath.toUri(); 182 fs = FileSystem.get(has.createJobConf(uri.getAuthority())); 183 } 184 Path launcherlibPath = getLauncherlibPath(); 185 setupLauncherLibPath(fs, launcherlibPath); 186 recursiveChangePermissions(fs, launcherlibPath, FsPermission.valueOf(PERMISSION_STRING)); 187 } 188 189 } 190 191 /** 192 * Copy launcher jars to Temp directory. 193 * 194 * @param fs the FileSystem 195 * @param tmpLauncherLibPath the tmp launcher lib path 196 * @throws IOException Signals that an I/O exception has occurred. 197 */ 198 private void setupLauncherLibPath(FileSystem fs, Path tmpLauncherLibPath) throws IOException { 199 200 ActionService actionService = Services.get().get(ActionService.class); 201 List<Class> classes = JavaActionExecutor.getCommonLauncherClasses(); 202 Path baseDir = new Path(tmpLauncherLibPath, JavaActionExecutor.OOZIE_COMMON_LIBDIR); 203 copyJarContainingClasses(classes, fs, baseDir, JavaActionExecutor.OOZIE_COMMON_LIBDIR); 204 Set<String> actionTypes = actionService.getActionTypes(); 205 for (String key : actionTypes) { 206 ActionExecutor executor = actionService.getExecutor(key); 207 if (executor instanceof JavaActionExecutor) { 208 JavaActionExecutor jexecutor = (JavaActionExecutor) executor; 209 classes = jexecutor.getLauncherClasses(); 210 if (classes != null) { 211 String type = executor.getType(); 212 Path executorDir = new Path(tmpLauncherLibPath, type); 213 copyJarContainingClasses(classes, fs, executorDir, type); 214 } 215 } 216 } 217 } 218 219 /** 220 * Recursive change permissions. 221 * 222 * @param fs the FileSystem 223 * @param path the Path 224 * @param perm is permission 225 * @throws IOException Signals that an I/O exception has occurred. 226 */ 227 private void recursiveChangePermissions(FileSystem fs, Path path, FsPermission fsPerm) throws IOException { 228 fs.setPermission(path, fsPerm); 229 FileStatus[] filesStatus = fs.listStatus(path); 230 for (int i = 0; i < filesStatus.length; i++) { 231 Path p = filesStatus[i].getPath(); 232 if (filesStatus[i].isDir()) { 233 recursiveChangePermissions(fs, p, fsPerm); 234 } 235 else { 236 fs.setPermission(p, fsPerm); 237 } 238 } 239 } 240 241 /** 242 * Copy jar containing classes. 243 * 244 * @param classes the classes 245 * @param fs the FileSystem 246 * @param executorDir is Path 247 * @param type is sharelib key 248 * @throws IOException Signals that an I/O exception has occurred. 249 */ 250 private void copyJarContainingClasses(List<Class> classes, FileSystem fs, Path executorDir, String type) 251 throws IOException { 252 fs.mkdirs(executorDir); 253 Set<String> localJarSet = new HashSet<String>(); 254 for (Class c : classes) { 255 String localJar = findContainingJar(c); 256 if (localJar != null) { 257 localJarSet.add(localJar); 258 } 259 else { 260 throw new IOException("No jar containing " + c + " found"); 261 } 262 } 263 List<Path> listOfPaths = new ArrayList<Path>(); 264 for (String localJarStr : localJarSet) { 265 File localJar = new File(localJarStr); 266 copyFromLocalFile(localJar, fs, executorDir); 267 Path path = new Path(executorDir, localJar.getName()); 268 listOfPaths.add(path); 269 LOG.info(localJar.getName() + " uploaded to " + executorDir.toString()); 270 } 271 launcherLibMap.put(type, listOfPaths); 272 273 } 274 275 private static boolean copyFromLocalFile(File src, FileSystem dstFS, Path dstDir) throws IOException { 276 Path dst = new Path(dstDir, src.getName()); 277 InputStream in=null; 278 OutputStream out = null; 279 try { 280 in = new FileInputStream(src); 281 out = dstFS.create(dst, true); 282 IOUtils.copyBytes(in, out, dstFS.getConf(), true); 283 } catch (IOException e) { 284 IOUtils.closeStream(out); 285 IOUtils.closeStream(in); 286 throw e; 287 } 288 return true; 289 290 } 291 292 /** 293 * Gets the path recursively. 294 * 295 * @param fs the FileSystem 296 * @param rootDir the root directory 297 * @param listOfPaths the list of paths 298 * @param shareLibKey the share lib key 299 * @return the path recursively 300 * @throws IOException Signals that an I/O exception has occurred. 301 */ 302 private void getPathRecursively(FileSystem fs, Path rootDir, List<Path> listOfPaths, String shareLibKey, 303 Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException { 304 if (rootDir == null) { 305 return; 306 } 307 308 try { 309 if (fs.isFile(new Path(new URI(rootDir.toString()).getPath()))) { 310 Path filePath = new Path(new URI(rootDir.toString()).getPath()); 311 Path qualifiedRootDirPath = fs.makeQualified(rootDir); 312 if (isFilePartOfConfList(rootDir)) { 313 cachePropertyFile(qualifiedRootDirPath, filePath, shareLibKey, shareLibConfigMap); 314 } 315 listOfPaths.add(qualifiedRootDirPath); 316 return; 317 } 318 319 FileStatus[] status = fs.listStatus(rootDir); 320 if (status == null) { 321 LOG.info("Shared lib " + rootDir + " doesn't exist, not adding to cache"); 322 return; 323 } 324 325 for (FileStatus file : status) { 326 if (file.isDir()) { 327 getPathRecursively(fs, file.getPath(), listOfPaths, shareLibKey, shareLibConfigMap); 328 } 329 else { 330 if (isFilePartOfConfList(file.getPath())) { 331 cachePropertyFile(file.getPath(), file.getPath(), shareLibKey, shareLibConfigMap); 332 } 333 listOfPaths.add(file.getPath()); 334 } 335 } 336 } 337 catch (URISyntaxException e) { 338 throw new IOException(e); 339 } 340 catch (JDOMException e) { 341 throw new IOException(e); 342 } 343 } 344 345 public Map<String, List<Path>> getShareLib() { 346 return shareLibMap; 347 } 348 349 private Map<String, Map<Path, Path>> getSymlinkMapping() { 350 return symlinkMapping; 351 } 352 353 /** 354 * Gets the action sharelib lib jars. 355 * 356 * @param shareLibKey the sharelib key 357 * @return List of paths 358 * @throws IOException Signals that an I/O exception has occurred. 359 */ 360 public List<Path> getShareLibJars(String shareLibKey) throws IOException { 361 // Sharelib map is empty means that on previous or startup attempt of 362 // caching sharelib has failed.Trying to reload 363 if (shareLibMap.isEmpty() && !shareLibLoadAttempted) { 364 synchronized (ShareLibService.class) { 365 if (shareLibMap.isEmpty()) { 366 updateShareLib(); 367 shareLibLoadAttempted = true; 368 } 369 } 370 } 371 checkSymlink(shareLibKey); 372 return shareLibMap.get(shareLibKey); 373 } 374 375 private void checkSymlink(String shareLibKey) throws IOException { 376 if (!HadoopShims.isSymlinkSupported() || symlinkMapping.get(shareLibKey) == null 377 || symlinkMapping.get(shareLibKey).isEmpty()) { 378 return; 379 } 380 381 HadoopShims fileSystem = new HadoopShims(fs); 382 for (Path path : symlinkMapping.get(shareLibKey).keySet()) { 383 if (!symlinkMapping.get(shareLibKey).get(path).equals(fileSystem.getSymLinkTarget(path))) { 384 synchronized (ShareLibService.class) { 385 Map<String, List<Path>> tmpShareLibMap = new HashMap<String, List<Path>>(shareLibMap); 386 387 Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<String, Map<Path, Configuration>>( 388 shareLibConfigMap); 389 390 Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>( 391 symlinkMapping); 392 393 LOG.info(MessageFormat.format("Symlink target for [{0}] has changed, was [{1}], now [{2}]", 394 shareLibKey, path, fileSystem.getSymLinkTarget(path))); 395 loadShareLibMetaFile(tmpShareLibMap, tmpSymlinkMapping, tmpShareLibConfigMap, sharelibMappingFile, 396 shareLibKey); 397 shareLibMap = tmpShareLibMap; 398 symlinkMapping = tmpSymlinkMapping; 399 shareLibConfigMap = tmpShareLibConfigMap; 400 return; 401 } 402 403 } 404 } 405 406 } 407 408 /** 409 * Gets the launcher jars. 410 * 411 * @param shareLibKey the shareLib key 412 * @return launcher jars paths 413 * @throws IOException Signals that an I/O exception has occurred. 414 */ 415 public List<Path> getSystemLibJars(String shareLibKey) throws IOException { 416 List<Path> returnList = new ArrayList<Path>(); 417 // Sharelib map is empty means that on previous or startup attempt of 418 // caching launcher jars has failed.Trying to reload 419 if (isShipLauncherEnabled) { 420 if (launcherLibMap.isEmpty()) { 421 synchronized (ShareLibService.class) { 422 if (launcherLibMap.isEmpty()) { 423 updateLauncherLib(); 424 } 425 } 426 } 427 if (launcherLibMap.get(shareLibKey) != null) { 428 returnList.addAll(launcherLibMap.get(shareLibKey)); 429 } 430 } 431 if (shareLibKey.equals(JavaActionExecutor.OOZIE_COMMON_LIBDIR)) { 432 List<Path> sharelibList = getShareLibJars(shareLibKey); 433 if (sharelibList != null) { 434 returnList.addAll(sharelibList); 435 } 436 } 437 return returnList; 438 } 439 440 /** 441 * Find containing jar containing. 442 * 443 * @param clazz the clazz 444 * @return the string 445 */ 446 @VisibleForTesting 447 protected String findContainingJar(Class clazz) { 448 ClassLoader loader = clazz.getClassLoader(); 449 String classFile = clazz.getName().replaceAll("\\.", "/") + ".class"; 450 try { 451 for (Enumeration itr = loader.getResources(classFile); itr.hasMoreElements();) { 452 URL url = (URL) itr.nextElement(); 453 if ("jar".equals(url.getProtocol())) { 454 String toReturn = url.getPath(); 455 if (toReturn.startsWith("file:")) { 456 toReturn = toReturn.substring("file:".length()); 457 // URLDecoder is a misnamed class, since it actually 458 // decodes 459 // x-www-form-urlencoded MIME type rather than actual 460 // URL encoding (which the file path has). Therefore it 461 // would 462 // decode +s to ' 's which is incorrect (spaces are 463 // actually 464 // either unencoded or encoded as "%20"). Replace +s 465 // first, so 466 // that they are kept sacred during the decoding 467 // process. 468 toReturn = toReturn.replaceAll("\\+", "%2B"); 469 toReturn = URLDecoder.decode(toReturn, "UTF-8"); 470 toReturn = toReturn.replaceAll("!.*$", ""); 471 return toReturn; 472 } 473 } 474 } 475 } 476 catch (IOException ioe) { 477 throw new RuntimeException(ioe); 478 } 479 return null; 480 } 481 482 /** 483 * Purge libs. 484 * 485 * @param fs the fs 486 * @param prefix the prefix 487 * @param current the current time 488 * @throws IOException Signals that an I/O exception has occurred. 489 */ 490 private void purgeLibs(FileSystem fs, final String prefix, final Date current) throws IOException { 491 Path executorLibBasePath = services.get(WorkflowAppService.class).getSystemLibPath(); 492 PathFilter directoryFilter = new PathFilter() { 493 @Override 494 public boolean accept(Path path) { 495 if (path.getName().startsWith(prefix)) { 496 String name = path.getName(); 497 String time = name.substring(prefix.length()); 498 Date d = null; 499 try { 500 d = dateFormat.parse(time); 501 } 502 catch (ParseException e) { 503 return false; 504 } 505 return (current.getTime() - d.getTime()) > retentionTime; 506 } 507 else { 508 return false; 509 } 510 } 511 }; 512 FileStatus[] dirList = fs.listStatus(executorLibBasePath, directoryFilter); 513 Arrays.sort(dirList, new Comparator<FileStatus>() { 514 // sort in desc order 515 @Override 516 public int compare(FileStatus o1, FileStatus o2) { 517 return o2.getPath().getName().compareTo(o1.getPath().getName()); 518 } 519 }); 520 521 // Logic is to keep all share-lib between current timestamp and 7days old + 1 latest sharelib older than 7 days. 522 // refer OOZIE-1761 523 for (int i = 1; i < dirList.length; i++) { 524 Path dirPath = dirList[i].getPath(); 525 fs.delete(dirPath, true); 526 LOG.info("Deleted old launcher jar lib directory {0}", dirPath.getName()); 527 } 528 } 529 530 @Override 531 public void destroy() { 532 shareLibMap.clear(); 533 launcherLibMap.clear(); 534 } 535 536 @Override 537 public Class<? extends Service> getInterface() { 538 return ShareLibService.class; 539 } 540 541 /** 542 * Update share lib cache. 543 * 544 * @return the map 545 * @throws IOException Signals that an I/O exception has occurred. 546 */ 547 public Map<String, String> updateShareLib() throws IOException { 548 Map<String, String> status = new HashMap<String, String>(); 549 550 if (fs == null) { 551 Path launcherlibPath = getLauncherlibPath(); 552 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 553 URI uri = launcherlibPath.toUri(); 554 fs = FileSystem.get(has.createJobConf(uri.getAuthority())); 555 } 556 557 Map<String, List<Path>> tempShareLibMap = new HashMap<String, List<Path>>(); 558 Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>(); 559 Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<String, Map<Path, Configuration>>(); 560 561 if (!StringUtils.isEmpty(sharelibMappingFile.trim())) { 562 String sharelibMetaFileNewTimeStamp = JsonUtils.formatDateRfc822( 563 new Date(fs.getFileStatus(new Path(sharelibMappingFile)).getModificationTime()), "GMT"); 564 loadShareLibMetaFile(tempShareLibMap, tmpSymlinkMapping, tmpShareLibConfigMap, sharelibMappingFile, null); 565 status.put("sharelibMetaFile", sharelibMappingFile); 566 status.put("sharelibMetaFileNewTimeStamp", sharelibMetaFileNewTimeStamp); 567 status.put("sharelibMetaFileOldTimeStamp", sharelibMetaFileOldTimeStamp); 568 sharelibMetaFileOldTimeStamp = sharelibMetaFileNewTimeStamp; 569 } 570 else { 571 Path shareLibpath = getLatestLibPath(services.get(WorkflowAppService.class).getSystemLibPath(), 572 SHARE_LIB_PREFIX); 573 loadShareLibfromDFS(tempShareLibMap, shareLibpath, tmpShareLibConfigMap); 574 575 if (shareLibpath != null) { 576 status.put("sharelibDirNew", shareLibpath.toString()); 577 status.put("sharelibDirOld", sharelibDirOld); 578 sharelibDirOld = shareLibpath.toString(); 579 } 580 581 } 582 shareLibMap = tempShareLibMap; 583 symlinkMapping = tmpSymlinkMapping; 584 shareLibConfigMap = tmpShareLibConfigMap; 585 return status; 586 } 587 588 /** 589 * Update share lib cache. Parse the share lib directory and each sub directory is a action key 590 * 591 * @param shareLibMap the share lib jar map 592 * @param shareLibpath the share libpath 593 * @throws IOException Signals that an I/O exception has occurred. 594 */ 595 private void loadShareLibfromDFS(Map<String, List<Path>> shareLibMap, Path shareLibpath, 596 Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException { 597 598 if (shareLibpath == null) { 599 LOG.info("No share lib directory found"); 600 return; 601 602 } 603 604 FileStatus[] dirList = fs.listStatus(shareLibpath); 605 606 if (dirList == null) { 607 return; 608 } 609 610 for (FileStatus dir : dirList) { 611 if (!dir.isDir()) { 612 continue; 613 } 614 List<Path> listOfPaths = new ArrayList<Path>(); 615 getPathRecursively(fs, dir.getPath(), listOfPaths, dir.getPath().getName(), shareLibConfigMap); 616 shareLibMap.put(dir.getPath().getName(), listOfPaths); 617 LOG.info("Share lib for " + dir.getPath().getName() + ":" + listOfPaths); 618 619 } 620 621 } 622 623 /** 624 * Load share lib text file. Sharelib mapping files contains list of key=value. where key is the action key and 625 * value is the DFS location of sharelib files. 626 * 627 * @param shareLibMap the share lib jar map 628 * @param symlinkMapping the symlink mapping 629 * @param sharelibFileMapping the sharelib file mapping 630 * @param shareLibKey the share lib key 631 * @throws IOException Signals that an I/O exception has occurred. 632 * @parm shareLibKey the sharelib key 633 */ 634 private void loadShareLibMetaFile(Map<String, List<Path>> shareLibMap, Map<String, Map<Path, Path>> symlinkMapping, 635 Map<String, Map<Path, Configuration>> shareLibConfigMap, String sharelibFileMapping, String shareLibKey) 636 throws IOException { 637 638 Path shareFileMappingPath = new Path(sharelibFileMapping); 639 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 640 FileSystem filesystem = FileSystem.get(has.createJobConf(shareFileMappingPath.toUri().getAuthority())); 641 Properties prop = new Properties(); 642 prop.load(filesystem.open(new Path(sharelibFileMapping))); 643 644 for (Object keyObject : prop.keySet()) { 645 String key = (String) keyObject; 646 String mapKey = key.substring(SHARE_LIB_CONF_PREFIX.length() + 1); 647 if (key.toLowerCase().startsWith(SHARE_LIB_CONF_PREFIX) 648 && (shareLibKey == null || shareLibKey.equals(mapKey))) { 649 loadSharelib(shareLibMap, symlinkMapping, shareLibConfigMap, mapKey, 650 ((String) prop.get(key)).split(",")); 651 } 652 } 653 } 654 655 private void loadSharelib(Map<String, List<Path>> tmpShareLibMap, Map<String, Map<Path, Path>> tmpSymlinkMapping, 656 Map<String, Map<Path, Configuration>> shareLibConfigMap, String shareLibKey, String pathList[]) 657 throws IOException { 658 List<Path> listOfPaths = new ArrayList<Path>(); 659 Map<Path, Path> symlinkMappingforAction = new HashMap<Path, Path>(); 660 HadoopShims fileSystem = new HadoopShims(fs); 661 662 for (String dfsPath : pathList) { 663 Path path = new Path(dfsPath); 664 getPathRecursively(fs, new Path(dfsPath), listOfPaths, shareLibKey, shareLibConfigMap); 665 if (HadoopShims.isSymlinkSupported() && fileSystem.isSymlink(path)) { 666 symlinkMappingforAction.put(fs.makeQualified(path), fileSystem.getSymLinkTarget(path)); 667 } 668 } 669 if (HadoopShims.isSymlinkSupported()) { 670 LOG.info("symlink for " + shareLibKey + ":" + symlinkMappingforAction); 671 tmpSymlinkMapping.put(shareLibKey, symlinkMappingforAction); 672 } 673 tmpShareLibMap.put(shareLibKey, listOfPaths); 674 LOG.info("Share lib for " + shareLibKey + ":" + listOfPaths); 675 } 676 677 /** 678 * Gets the launcherlib path. 679 * 680 * @return the launcherlib path 681 */ 682 private Path getLauncherlibPath() { 683 String formattedDate = dateFormat.format(Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime()); 684 Path tmpLauncherLibPath = new Path(services.get(WorkflowAppService.class).getSystemLibPath(), LAUNCHER_LIB_PREFIX 685 + formattedDate); 686 return tmpLauncherLibPath; 687 } 688 689 /** 690 * Gets the Latest lib path. 691 * 692 * @param rootDir the root dir 693 * @param prefix the prefix 694 * @return latest lib path 695 * @throws IOException Signals that an I/O exception has occurred. 696 */ 697 public Path getLatestLibPath(Path rootDir, final String prefix) throws IOException { 698 Date max = new Date(0L); 699 Path path = null; 700 PathFilter directoryFilter = new PathFilter() { 701 @Override 702 public boolean accept(Path path) { 703 return path.getName().startsWith(prefix); 704 } 705 }; 706 707 FileStatus[] files = fs.listStatus(rootDir, directoryFilter); 708 for (FileStatus file : files) { 709 String name = file.getPath().getName().toString(); 710 String time = name.substring(prefix.length()); 711 Date d = null; 712 try { 713 d = dateFormat.parse(time); 714 } 715 catch (ParseException e) { 716 continue; 717 } 718 if (d.compareTo(max) > 0) { 719 path = file.getPath(); 720 max = d; 721 } 722 } 723 // If there are no timestamped directories, fall back to root directory 724 if (path == null) { 725 path = rootDir; 726 } 727 return path; 728 } 729 730 /** 731 * Instruments the log service. 732 * <p> 733 * It sets instrumentation variables indicating the location of the sharelib and launcherlib 734 * 735 * @param instr instrumentation to use. 736 */ 737 @Override 738 public void instrument(Instrumentation instr) { 739 instr.addVariable("libs", "sharelib.source", new Instrumentation.Variable<String>() { 740 @Override 741 public String getValue() { 742 if (!StringUtils.isEmpty(sharelibMappingFile.trim())) { 743 return SHARELIB_MAPPING_FILE; 744 } 745 return WorkflowAppService.SYSTEM_LIB_PATH; 746 } 747 }); 748 instr.addVariable("libs", "sharelib.mapping.file", new Instrumentation.Variable<String>() { 749 @Override 750 public String getValue() { 751 if (!StringUtils.isEmpty(sharelibMappingFile.trim())) { 752 return sharelibMappingFile; 753 } 754 return "(none)"; 755 } 756 }); 757 instr.addVariable("libs", "sharelib.system.libpath", new Instrumentation.Variable<String>() { 758 @Override 759 public String getValue() { 760 String sharelibPath = "(unavailable)"; 761 try { 762 Path libPath = getLatestLibPath(services.get(WorkflowAppService.class).getSystemLibPath(), 763 SHARE_LIB_PREFIX); 764 if (libPath != null) { 765 sharelibPath = libPath.toUri().toString(); 766 } 767 } 768 catch (IOException ioe) { 769 // ignore exception because we're just doing instrumentation 770 } 771 return sharelibPath; 772 } 773 }); 774 instr.addVariable("libs", "sharelib.mapping.file.timestamp", new Instrumentation.Variable<String>() { 775 @Override 776 public String getValue() { 777 if (!StringUtils.isEmpty(sharelibMetaFileOldTimeStamp)) { 778 return sharelibMetaFileOldTimeStamp; 779 } 780 return "(none)"; 781 } 782 }); 783 instr.addVariable("libs", "sharelib.keys", new Instrumentation.Variable<String>() { 784 @Override 785 public String getValue() { 786 Map<String, List<Path>> shareLib = getShareLib(); 787 if (shareLib != null && !shareLib.isEmpty()) { 788 Set<String> keySet = shareLib.keySet(); 789 return keySet.toString(); 790 } 791 return "(unavailable)"; 792 } 793 }); 794 instr.addVariable("libs", "launcherlib.system.libpath", new Instrumentation.Variable<String>() { 795 @Override 796 public String getValue() { 797 return getLauncherlibPath().toUri().toString(); 798 } 799 }); 800 instr.addVariable("libs", "sharelib.symlink.mapping", new Instrumentation.Variable<String>() { 801 @Override 802 public String getValue() { 803 Map<String, Map<Path, Path>> shareLibSymlinkMapping = getSymlinkMapping(); 804 if (shareLibSymlinkMapping != null && !shareLibSymlinkMapping.isEmpty() 805 && shareLibSymlinkMapping.values() != null && !shareLibSymlinkMapping.values().isEmpty()) { 806 StringBuffer bf = new StringBuffer(); 807 for (Entry<String, Map<Path, Path>> entry : shareLibSymlinkMapping.entrySet()) { 808 if (entry.getKey() != null && !entry.getValue().isEmpty()) { 809 for (Path path : entry.getValue().keySet()) { 810 bf.append(path).append("(").append(entry.getKey()).append(")").append("=>") 811 .append(shareLibSymlinkMapping.get(entry.getKey()) != null ? shareLibSymlinkMapping 812 .get(entry.getKey()).get(path) : "").append(","); 813 } 814 } 815 } 816 return bf.toString(); 817 } 818 return "(none)"; 819 } 820 }); 821 822 instr.addVariable("libs", "sharelib.cached.config.file", new Instrumentation.Variable<String>() { 823 @Override 824 public String getValue() { 825 Map<String, Map<Path, Configuration>> shareLibConfigMap = getShareLibConfigMap(); 826 if (shareLibConfigMap != null && !shareLibConfigMap.isEmpty()) { 827 StringBuffer bf = new StringBuffer(); 828 829 for (String path : shareLibConfigMap.keySet()) { 830 bf.append(path).append(";"); 831 } 832 return bf.toString(); 833 } 834 return "(none)"; 835 } 836 }); 837 838 } 839 840 /** 841 * Returns file system for shared libraries. 842 * <p> 843 * If WorkflowAppService#getSystemLibPath doesn't have authority then a default one assumed 844 * 845 * @return file system for shared libraries 846 */ 847 public FileSystem getFileSystem() { 848 return fs; 849 } 850 851 /** 852 * Cache XML conf file 853 * 854 * @param hdfsPath the hdfs path 855 * @param shareLibKey the share lib key 856 * @throws IOException Signals that an I/O exception has occurred. 857 * @throws JDOMException 858 */ 859 private void cachePropertyFile(Path qualifiedHdfsPath, Path hdfsPath, String shareLibKey, 860 Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException, JDOMException { 861 Map<Path, Configuration> confMap = shareLibConfigMap.get(shareLibKey); 862 if (confMap == null) { 863 confMap = new HashMap<Path, Configuration>(); 864 shareLibConfigMap.put(shareLibKey, confMap); 865 } 866 Configuration xmlConf = new XConfiguration(fs.open(hdfsPath)); 867 confMap.put(qualifiedHdfsPath, xmlConf); 868 869 } 870 871 private void cacheActionKeySharelibConfList() { 872 ActionService actionService = Services.get().get(ActionService.class); 873 Set<String> actionTypes = actionService.getActionTypes(); 874 for (String key : actionTypes) { 875 ActionExecutor executor = actionService.getExecutor(key); 876 if (executor instanceof JavaActionExecutor) { 877 JavaActionExecutor jexecutor = (JavaActionExecutor) executor; 878 actionConfSet.addAll( 879 new HashSet<String>(Arrays.asList(jexecutor.getShareLibFilesForActionConf() == null ? new String[0] 880 : jexecutor.getShareLibFilesForActionConf()))); 881 } 882 } 883 } 884 885 public Configuration getShareLibConf(String inputKey, Path path) { 886 if (shareLibConfigMap.containsKey(inputKey)) { 887 return shareLibConfigMap.get(inputKey).get(path); 888 } 889 890 return null; 891 } 892 893 @VisibleForTesting 894 public Map<String, Map<Path, Configuration>> getShareLibConfigMap() { 895 return shareLibConfigMap; 896 } 897 898 private boolean isFilePartOfConfList(Path path) throws URISyntaxException { 899 String fragmentName = new URI(path.toString()).getFragment(); 900 String fileName = fragmentName == null ? path.getName() : fragmentName; 901 return actionConfSet.contains(fileName); 902 } 903}