001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.io.File; 022import java.io.IOException; 023import java.net.URI; 024import java.net.URISyntaxException; 025import java.net.URL; 026import java.net.URLDecoder; 027import java.text.MessageFormat; 028import java.text.ParseException; 029import java.text.SimpleDateFormat; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.Calendar; 033import java.util.Comparator; 034import java.util.Date; 035import java.util.Enumeration; 036import java.util.HashMap; 037import java.util.HashSet; 038import java.util.List; 039import java.util.Map; 040import java.util.Properties; 041import java.util.Set; 042import java.util.TimeZone; 043import java.util.Map.Entry; 044import org.apache.commons.lang.StringUtils; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.fs.FileStatus; 047import org.apache.hadoop.fs.FileSystem; 048import org.apache.hadoop.fs.Path; 049import org.apache.hadoop.fs.PathFilter; 050import org.apache.hadoop.fs.permission.FsPermission; 051import org.apache.oozie.action.ActionExecutor; 052import org.apache.oozie.action.hadoop.JavaActionExecutor; 053import org.apache.oozie.client.rest.JsonUtils; 054import org.apache.oozie.hadoop.utils.HadoopShims; 055import org.apache.oozie.util.Instrumentable; 056import org.apache.oozie.util.Instrumentation; 057import org.apache.oozie.util.XConfiguration; 058import org.apache.oozie.util.XLog; 059import com.google.common.annotations.VisibleForTesting; 060 061import org.apache.oozie.ErrorCode; 062import org.jdom.JDOMException; 063 064public class ShareLibService implements Service, Instrumentable { 065 066 public static final String LAUNCHERJAR_LIB_RETENTION = CONF_PREFIX + "ShareLibService.temp.sharelib.retention.days"; 067 068 public static final String SHARELIB_MAPPING_FILE = CONF_PREFIX + "ShareLibService.mapping.file"; 069 070 public static final String SHIP_LAUNCHER_JAR = "oozie.action.ship.launcher.jar"; 071 072 public static final String PURGE_INTERVAL = CONF_PREFIX + "ShareLibService.purge.interval"; 073 074 public static final String FAIL_FAST_ON_STARTUP = CONF_PREFIX + "ShareLibService.fail.fast.on.startup"; 075 076 private static final String PERMISSION_STRING = "-rwxr-xr-x"; 077 078 public static final String LAUNCHER_LIB_PREFIX = "launcher_"; 079 080 public static final String SHARE_LIB_PREFIX = "lib_"; 081 082 public static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); 083 084 private Services services; 085 086 private Map<String, List<Path>> shareLibMap = new HashMap<String, List<Path>>(); 087 088 private Map<String, Map<Path, Configuration>> shareLibConfigMap = new HashMap<String, Map<Path, Configuration>>(); 089 090 private Map<String, List<Path>> launcherLibMap = new HashMap<String, List<Path>>(); 091 092 private Set<String> actionConfSet = new HashSet<String>(); 093 094 // symlink mapping. Oozie keeps on checking symlink path and if changes, Oozie reloads the sharelib 095 private Map<String, Map<Path, Path>> symlinkMapping = new HashMap<String, Map<Path, Path>>(); 096 097 private static XLog LOG = XLog.getLog(ShareLibService.class); 098 099 private String sharelibMappingFile; 100 101 private boolean isShipLauncherEnabled = false; 102 103 public static String SHARE_LIB_CONF_PREFIX = "oozie"; 104 105 private boolean shareLibLoadAttempted = false; 106 107 private String sharelibMetaFileOldTimeStamp; 108 109 private String sharelibDirOld; 110 111 FileSystem fs; 112 113 final long retentionTime = 1000 * 60 * 60 * 24 * ConfigurationService.getInt(LAUNCHERJAR_LIB_RETENTION); 114 115 @Override 116 public void init(Services services) throws ServiceException { 117 this.services = services; 118 sharelibMappingFile = ConfigurationService.get(services.getConf(), SHARELIB_MAPPING_FILE); 119 isShipLauncherEnabled = ConfigurationService.getBoolean(services.getConf(), SHIP_LAUNCHER_JAR); 120 boolean failOnfailure = ConfigurationService.getBoolean(services.getConf(), FAIL_FAST_ON_STARTUP); 121 Path launcherlibPath = getLauncherlibPath(); 122 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 123 URI uri = launcherlibPath.toUri(); 124 try { 125 fs = FileSystem.get(has.createJobConf(uri.getAuthority())); 126 //cache action key sharelib conf list 127 cacheActionKeySharelibConfList(); 128 updateLauncherLib(); 129 updateShareLib(); 130 } 131 catch (Throwable e) { 132 if (failOnfailure) { 133 LOG.error("Sharelib initialization fails", e); 134 throw new ServiceException(ErrorCode.E0104, getClass().getName(), "Sharelib initialization fails. ", e); 135 } 136 else { 137 // We don't want to actually fail init by throwing an Exception, so only create the ServiceException and 138 // log it 139 ServiceException se = new ServiceException(ErrorCode.E0104, getClass().getName(), 140 "Not able to cache sharelib. An Admin needs to install the sharelib with oozie-setup.sh and issue the " 141 + "'oozie admin' CLI command to update the sharelib", e); 142 LOG.error(se); 143 } 144 } 145 Runnable purgeLibsRunnable = new Runnable() { 146 @Override 147 public void run() { 148 System.out.flush(); 149 try { 150 // Only one server should purge sharelib 151 if (Services.get().get(JobsConcurrencyService.class).isLeader()) { 152 final Date current = Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime(); 153 purgeLibs(fs, LAUNCHER_LIB_PREFIX, current); 154 purgeLibs(fs, SHARE_LIB_PREFIX, current); 155 } 156 } 157 catch (IOException e) { 158 LOG.error("There was an issue purging the sharelib", e); 159 } 160 } 161 }; 162 services.get(SchedulerService.class).schedule(purgeLibsRunnable, 10, 163 ConfigurationService.getInt(services.getConf(), PURGE_INTERVAL) * 60 * 60 * 24, 164 SchedulerService.Unit.SEC); 165 } 166 167 /** 168 * Recursively change permissions. 169 * 170 * @throws IOException Signals that an I/O exception has occurred. 171 */ 172 private void updateLauncherLib() throws IOException { 173 if (isShipLauncherEnabled) { 174 if (fs == null) { 175 Path launcherlibPath = getLauncherlibPath(); 176 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 177 URI uri = launcherlibPath.toUri(); 178 fs = FileSystem.get(has.createJobConf(uri.getAuthority())); 179 } 180 Path launcherlibPath = getLauncherlibPath(); 181 setupLauncherLibPath(fs, launcherlibPath); 182 recursiveChangePermissions(fs, launcherlibPath, FsPermission.valueOf(PERMISSION_STRING)); 183 } 184 185 } 186 187 /** 188 * Copy launcher jars to Temp directory. 189 * 190 * @param fs the FileSystem 191 * @param tmpLauncherLibPath the tmp launcher lib path 192 * @throws IOException Signals that an I/O exception has occurred. 193 */ 194 private void setupLauncherLibPath(FileSystem fs, Path tmpLauncherLibPath) throws IOException { 195 196 ActionService actionService = Services.get().get(ActionService.class); 197 List<Class> classes = JavaActionExecutor.getCommonLauncherClasses(); 198 Path baseDir = new Path(tmpLauncherLibPath, JavaActionExecutor.OOZIE_COMMON_LIBDIR); 199 copyJarContainingClasses(classes, fs, baseDir, JavaActionExecutor.OOZIE_COMMON_LIBDIR); 200 Set<String> actionTypes = actionService.getActionTypes(); 201 for (String key : actionTypes) { 202 ActionExecutor executor = actionService.getExecutor(key); 203 if (executor instanceof JavaActionExecutor) { 204 JavaActionExecutor jexecutor = (JavaActionExecutor) executor; 205 classes = jexecutor.getLauncherClasses(); 206 if (classes != null) { 207 String type = executor.getType(); 208 Path executorDir = new Path(tmpLauncherLibPath, type); 209 copyJarContainingClasses(classes, fs, executorDir, type); 210 } 211 } 212 } 213 } 214 215 /** 216 * Recursive change permissions. 217 * 218 * @param fs the FileSystem 219 * @param path the Path 220 * @param perm is permission 221 * @throws IOException Signals that an I/O exception has occurred. 222 */ 223 private void recursiveChangePermissions(FileSystem fs, Path path, FsPermission fsPerm) throws IOException { 224 fs.setPermission(path, fsPerm); 225 FileStatus[] filesStatus = fs.listStatus(path); 226 for (int i = 0; i < filesStatus.length; i++) { 227 Path p = filesStatus[i].getPath(); 228 if (filesStatus[i].isDir()) { 229 recursiveChangePermissions(fs, p, fsPerm); 230 } 231 else { 232 fs.setPermission(p, fsPerm); 233 } 234 } 235 } 236 237 /** 238 * Copy jar containing classes. 239 * 240 * @param classes the classes 241 * @param fs the FileSystem 242 * @param executorDir is Path 243 * @param type is sharelib key 244 * @throws IOException Signals that an I/O exception has occurred. 245 */ 246 private void copyJarContainingClasses(List<Class> classes, FileSystem fs, Path executorDir, String type) 247 throws IOException { 248 fs.mkdirs(executorDir); 249 Set<String> localJarSet = new HashSet<String>(); 250 for (Class c : classes) { 251 String localJar = findContainingJar(c); 252 if (localJar != null) { 253 localJarSet.add(localJar); 254 } 255 else { 256 throw new IOException("No jar containing " + c + " found"); 257 } 258 } 259 List<Path> listOfPaths = new ArrayList<Path>(); 260 for (String localJarStr : localJarSet) { 261 File localJar = new File(localJarStr); 262 fs.copyFromLocalFile(new Path(localJar.getPath()), executorDir); 263 Path path = new Path(executorDir, localJar.getName()); 264 listOfPaths.add(path); 265 LOG.info(localJar.getName() + " uploaded to " + executorDir.toString()); 266 } 267 launcherLibMap.put(type, listOfPaths); 268 269 } 270 271 /** 272 * Gets the path recursively. 273 * 274 * @param fs the FileSystem 275 * @param rootDir the root directory 276 * @param listOfPaths the list of paths 277 * @param shareLibKey the share lib key 278 * @return the path recursively 279 * @throws IOException Signals that an I/O exception has occurred. 280 */ 281 private void getPathRecursively(FileSystem fs, Path rootDir, List<Path> listOfPaths, String shareLibKey, 282 Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException { 283 if (rootDir == null) { 284 return; 285 } 286 287 try { 288 if (fs.isFile(new Path(new URI(rootDir.toString()).getPath()))) { 289 Path filePath = new Path(new URI(rootDir.toString()).getPath()); 290 Path qualifiedRootDirPath = fs.makeQualified(rootDir); 291 if (isFilePartOfConfList(rootDir)) { 292 cachePropertyFile(qualifiedRootDirPath, filePath, shareLibKey, shareLibConfigMap); 293 } 294 listOfPaths.add(qualifiedRootDirPath); 295 return; 296 } 297 298 FileStatus[] status = fs.listStatus(rootDir); 299 if (status == null) { 300 LOG.info("Shared lib " + rootDir + " doesn't exist, not adding to cache"); 301 return; 302 } 303 304 for (FileStatus file : status) { 305 if (file.isDir()) { 306 getPathRecursively(fs, file.getPath(), listOfPaths, shareLibKey, shareLibConfigMap); 307 } 308 else { 309 if (isFilePartOfConfList(file.getPath())) { 310 cachePropertyFile(file.getPath(), file.getPath(), shareLibKey, shareLibConfigMap); 311 } 312 listOfPaths.add(file.getPath()); 313 } 314 } 315 } 316 catch (URISyntaxException e) { 317 throw new IOException(e); 318 } 319 catch (JDOMException e) { 320 throw new IOException(e); 321 } 322 } 323 324 public Map<String, List<Path>> getShareLib() { 325 return shareLibMap; 326 } 327 328 private Map<String, Map<Path, Path>> getSymlinkMapping() { 329 return symlinkMapping; 330 } 331 332 /** 333 * Gets the action sharelib lib jars. 334 * 335 * @param shareLibKey the sharelib key 336 * @return List of paths 337 * @throws IOException Signals that an I/O exception has occurred. 338 */ 339 public List<Path> getShareLibJars(String shareLibKey) throws IOException { 340 // Sharelib map is empty means that on previous or startup attempt of 341 // caching sharelib has failed.Trying to reload 342 if (shareLibMap.isEmpty() && !shareLibLoadAttempted) { 343 synchronized (ShareLibService.class) { 344 if (shareLibMap.isEmpty()) { 345 updateShareLib(); 346 shareLibLoadAttempted = true; 347 } 348 } 349 } 350 checkSymlink(shareLibKey); 351 return shareLibMap.get(shareLibKey); 352 } 353 354 private void checkSymlink(String shareLibKey) throws IOException { 355 if (!HadoopShims.isSymlinkSupported() || symlinkMapping.get(shareLibKey) == null 356 || symlinkMapping.get(shareLibKey).isEmpty()) { 357 return; 358 } 359 360 HadoopShims fileSystem = new HadoopShims(fs); 361 for (Path path : symlinkMapping.get(shareLibKey).keySet()) { 362 if (!symlinkMapping.get(shareLibKey).get(path).equals(fileSystem.getSymLinkTarget(path))) { 363 synchronized (ShareLibService.class) { 364 Map<String, List<Path>> tmpShareLibMap = new HashMap<String, List<Path>>(shareLibMap); 365 366 Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<String, Map<Path, Configuration>>( 367 shareLibConfigMap); 368 369 Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>( 370 symlinkMapping); 371 372 LOG.info(MessageFormat.format("Symlink target for [{0}] has changed, was [{1}], now [{2}]", 373 shareLibKey, path, fileSystem.getSymLinkTarget(path))); 374 loadShareLibMetaFile(tmpShareLibMap, tmpSymlinkMapping, tmpShareLibConfigMap, sharelibMappingFile, 375 shareLibKey); 376 shareLibMap = tmpShareLibMap; 377 symlinkMapping = tmpSymlinkMapping; 378 shareLibConfigMap = tmpShareLibConfigMap; 379 return; 380 } 381 382 } 383 } 384 385 } 386 387 /** 388 * Gets the launcher jars. 389 * 390 * @param shareLibKey the shareLib key 391 * @return launcher jars paths 392 * @throws IOException Signals that an I/O exception has occurred. 393 */ 394 public List<Path> getSystemLibJars(String shareLibKey) throws IOException { 395 List<Path> returnList = new ArrayList<Path>(); 396 // Sharelib map is empty means that on previous or startup attempt of 397 // caching launcher jars has failed.Trying to reload 398 if (isShipLauncherEnabled) { 399 if (launcherLibMap.isEmpty()) { 400 synchronized (ShareLibService.class) { 401 if (launcherLibMap.isEmpty()) { 402 updateLauncherLib(); 403 } 404 } 405 } 406 if (launcherLibMap.get(shareLibKey) != null) { 407 returnList.addAll(launcherLibMap.get(shareLibKey)); 408 } 409 } 410 if (shareLibKey.equals(JavaActionExecutor.OOZIE_COMMON_LIBDIR)) { 411 List<Path> sharelibList = getShareLibJars(shareLibKey); 412 if (sharelibList != null) { 413 returnList.addAll(sharelibList); 414 } 415 } 416 return returnList; 417 } 418 419 /** 420 * Find containing jar containing. 421 * 422 * @param clazz the clazz 423 * @return the string 424 */ 425 @VisibleForTesting 426 protected String findContainingJar(Class clazz) { 427 ClassLoader loader = clazz.getClassLoader(); 428 String classFile = clazz.getName().replaceAll("\\.", "/") + ".class"; 429 try { 430 for (Enumeration itr = loader.getResources(classFile); itr.hasMoreElements();) { 431 URL url = (URL) itr.nextElement(); 432 if ("jar".equals(url.getProtocol())) { 433 String toReturn = url.getPath(); 434 if (toReturn.startsWith("file:")) { 435 toReturn = toReturn.substring("file:".length()); 436 // URLDecoder is a misnamed class, since it actually 437 // decodes 438 // x-www-form-urlencoded MIME type rather than actual 439 // URL encoding (which the file path has). Therefore it 440 // would 441 // decode +s to ' 's which is incorrect (spaces are 442 // actually 443 // either unencoded or encoded as "%20"). Replace +s 444 // first, so 445 // that they are kept sacred during the decoding 446 // process. 447 toReturn = toReturn.replaceAll("\\+", "%2B"); 448 toReturn = URLDecoder.decode(toReturn, "UTF-8"); 449 toReturn = toReturn.replaceAll("!.*$", ""); 450 return toReturn; 451 } 452 } 453 } 454 } 455 catch (IOException ioe) { 456 throw new RuntimeException(ioe); 457 } 458 return null; 459 } 460 461 /** 462 * Purge libs. 463 * 464 * @param fs the fs 465 * @param prefix the prefix 466 * @param current the current time 467 * @throws IOException Signals that an I/O exception has occurred. 468 */ 469 private void purgeLibs(FileSystem fs, final String prefix, final Date current) throws IOException { 470 Path executorLibBasePath = services.get(WorkflowAppService.class).getSystemLibPath(); 471 PathFilter directoryFilter = new PathFilter() { 472 @Override 473 public boolean accept(Path path) { 474 if (path.getName().startsWith(prefix)) { 475 String name = path.getName(); 476 String time = name.substring(prefix.length()); 477 Date d = null; 478 try { 479 d = dateFormat.parse(time); 480 } 481 catch (ParseException e) { 482 return false; 483 } 484 return (current.getTime() - d.getTime()) > retentionTime; 485 } 486 else { 487 return false; 488 } 489 } 490 }; 491 FileStatus[] dirList = fs.listStatus(executorLibBasePath, directoryFilter); 492 Arrays.sort(dirList, new Comparator<FileStatus>() { 493 // sort in desc order 494 @Override 495 public int compare(FileStatus o1, FileStatus o2) { 496 return o2.getPath().getName().compareTo(o1.getPath().getName()); 497 } 498 }); 499 500 // Logic is to keep all share-lib between current timestamp and 7days old + 1 latest sharelib older than 7 days. 501 // refer OOZIE-1761 502 for (int i = 1; i < dirList.length; i++) { 503 Path dirPath = dirList[i].getPath(); 504 fs.delete(dirPath, true); 505 LOG.info("Deleted old launcher jar lib directory {0}", dirPath.getName()); 506 } 507 } 508 509 @Override 510 public void destroy() { 511 shareLibMap.clear(); 512 launcherLibMap.clear(); 513 } 514 515 @Override 516 public Class<? extends Service> getInterface() { 517 return ShareLibService.class; 518 } 519 520 /** 521 * Update share lib cache. 522 * 523 * @return the map 524 * @throws IOException Signals that an I/O exception has occurred. 525 */ 526 public Map<String, String> updateShareLib() throws IOException { 527 Map<String, String> status = new HashMap<String, String>(); 528 529 if (fs == null) { 530 Path launcherlibPath = getLauncherlibPath(); 531 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 532 URI uri = launcherlibPath.toUri(); 533 fs = FileSystem.get(has.createJobConf(uri.getAuthority())); 534 } 535 536 Map<String, List<Path>> tempShareLibMap = new HashMap<String, List<Path>>(); 537 Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>(); 538 Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<String, Map<Path, Configuration>>(); 539 540 if (!StringUtils.isEmpty(sharelibMappingFile.trim())) { 541 String sharelibMetaFileNewTimeStamp = JsonUtils.formatDateRfc822( 542 new Date(fs.getFileStatus(new Path(sharelibMappingFile)).getModificationTime()), "GMT"); 543 loadShareLibMetaFile(tempShareLibMap, tmpSymlinkMapping, tmpShareLibConfigMap, sharelibMappingFile, null); 544 status.put("sharelibMetaFile", sharelibMappingFile); 545 status.put("sharelibMetaFileNewTimeStamp", sharelibMetaFileNewTimeStamp); 546 status.put("sharelibMetaFileOldTimeStamp", sharelibMetaFileOldTimeStamp); 547 sharelibMetaFileOldTimeStamp = sharelibMetaFileNewTimeStamp; 548 } 549 else { 550 Path shareLibpath = getLatestLibPath(services.get(WorkflowAppService.class).getSystemLibPath(), 551 SHARE_LIB_PREFIX); 552 loadShareLibfromDFS(tempShareLibMap, shareLibpath, tmpShareLibConfigMap); 553 554 if (shareLibpath != null) { 555 status.put("sharelibDirNew", shareLibpath.toString()); 556 status.put("sharelibDirOld", sharelibDirOld); 557 sharelibDirOld = shareLibpath.toString(); 558 } 559 560 } 561 shareLibMap = tempShareLibMap; 562 symlinkMapping = tmpSymlinkMapping; 563 shareLibConfigMap = tmpShareLibConfigMap; 564 return status; 565 } 566 567 /** 568 * Update share lib cache. Parse the share lib directory and each sub directory is a action key 569 * 570 * @param shareLibMap the share lib jar map 571 * @param shareLibpath the share libpath 572 * @throws IOException Signals that an I/O exception has occurred. 573 */ 574 private void loadShareLibfromDFS(Map<String, List<Path>> shareLibMap, Path shareLibpath, 575 Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException { 576 577 if (shareLibpath == null) { 578 LOG.info("No share lib directory found"); 579 return; 580 581 } 582 583 FileStatus[] dirList = fs.listStatus(shareLibpath); 584 585 if (dirList == null) { 586 return; 587 } 588 589 for (FileStatus dir : dirList) { 590 if (!dir.isDir()) { 591 continue; 592 } 593 List<Path> listOfPaths = new ArrayList<Path>(); 594 getPathRecursively(fs, dir.getPath(), listOfPaths, dir.getPath().getName(), shareLibConfigMap); 595 shareLibMap.put(dir.getPath().getName(), listOfPaths); 596 LOG.info("Share lib for " + dir.getPath().getName() + ":" + listOfPaths); 597 598 } 599 600 } 601 602 /** 603 * Load share lib text file. Sharelib mapping files contains list of key=value. where key is the action key and 604 * value is the DFS location of sharelib files. 605 * 606 * @param shareLibMap the share lib jar map 607 * @param symlinkMapping the symlink mapping 608 * @param sharelibFileMapping the sharelib file mapping 609 * @param shareLibKey the share lib key 610 * @throws IOException Signals that an I/O exception has occurred. 611 * @parm shareLibKey the sharelib key 612 */ 613 private void loadShareLibMetaFile(Map<String, List<Path>> shareLibMap, Map<String, Map<Path, Path>> symlinkMapping, 614 Map<String, Map<Path, Configuration>> shareLibConfigMap, String sharelibFileMapping, String shareLibKey) 615 throws IOException { 616 617 Path shareFileMappingPath = new Path(sharelibFileMapping); 618 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 619 FileSystem filesystem = FileSystem.get(has.createJobConf(shareFileMappingPath.toUri().getAuthority())); 620 Properties prop = new Properties(); 621 prop.load(filesystem.open(new Path(sharelibFileMapping))); 622 623 for (Object keyObject : prop.keySet()) { 624 String key = (String) keyObject; 625 String mapKey = key.substring(SHARE_LIB_CONF_PREFIX.length() + 1); 626 if (key.toLowerCase().startsWith(SHARE_LIB_CONF_PREFIX) 627 && (shareLibKey == null || shareLibKey.equals(mapKey))) { 628 loadSharelib(shareLibMap, symlinkMapping, shareLibConfigMap, mapKey, 629 ((String) prop.get(key)).split(",")); 630 } 631 } 632 } 633 634 private void loadSharelib(Map<String, List<Path>> tmpShareLibMap, Map<String, Map<Path, Path>> tmpSymlinkMapping, 635 Map<String, Map<Path, Configuration>> shareLibConfigMap, String shareLibKey, String pathList[]) 636 throws IOException { 637 List<Path> listOfPaths = new ArrayList<Path>(); 638 Map<Path, Path> symlinkMappingforAction = new HashMap<Path, Path>(); 639 HadoopShims fileSystem = new HadoopShims(fs); 640 641 for (String dfsPath : pathList) { 642 Path path = new Path(dfsPath); 643 getPathRecursively(fs, new Path(dfsPath), listOfPaths, shareLibKey, shareLibConfigMap); 644 if (HadoopShims.isSymlinkSupported() && fileSystem.isSymlink(path)) { 645 symlinkMappingforAction.put(fs.makeQualified(path), fileSystem.getSymLinkTarget(path)); 646 } 647 } 648 if (HadoopShims.isSymlinkSupported()) { 649 LOG.info("symlink for " + shareLibKey + ":" + symlinkMappingforAction); 650 tmpSymlinkMapping.put(shareLibKey, symlinkMappingforAction); 651 } 652 tmpShareLibMap.put(shareLibKey, listOfPaths); 653 LOG.info("Share lib for " + shareLibKey + ":" + listOfPaths); 654 } 655 656 /** 657 * Gets the launcherlib path. 658 * 659 * @return the launcherlib path 660 */ 661 private Path getLauncherlibPath() { 662 String formattedDate = dateFormat.format(Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime()); 663 Path tmpLauncherLibPath = new Path(services.get(WorkflowAppService.class).getSystemLibPath(), LAUNCHER_LIB_PREFIX 664 + formattedDate); 665 return tmpLauncherLibPath; 666 } 667 668 /** 669 * Gets the Latest lib path. 670 * 671 * @param rootDir the root dir 672 * @param prefix the prefix 673 * @return latest lib path 674 * @throws IOException Signals that an I/O exception has occurred. 675 */ 676 public Path getLatestLibPath(Path rootDir, final String prefix) throws IOException { 677 Date max = new Date(0L); 678 Path path = null; 679 PathFilter directoryFilter = new PathFilter() { 680 @Override 681 public boolean accept(Path path) { 682 return path.getName().startsWith(prefix); 683 } 684 }; 685 686 FileStatus[] files = fs.listStatus(rootDir, directoryFilter); 687 for (FileStatus file : files) { 688 String name = file.getPath().getName().toString(); 689 String time = name.substring(prefix.length()); 690 Date d = null; 691 try { 692 d = dateFormat.parse(time); 693 } 694 catch (ParseException e) { 695 continue; 696 } 697 if (d.compareTo(max) > 0) { 698 path = file.getPath(); 699 max = d; 700 } 701 } 702 // If there are no timestamped directories, fall back to root directory 703 if (path == null) { 704 path = rootDir; 705 } 706 return path; 707 } 708 709 /** 710 * Instruments the log service. 711 * <p> 712 * It sets instrumentation variables indicating the location of the sharelib and launcherlib 713 * 714 * @param instr instrumentation to use. 715 */ 716 @Override 717 public void instrument(Instrumentation instr) { 718 instr.addVariable("libs", "sharelib.source", new Instrumentation.Variable<String>() { 719 @Override 720 public String getValue() { 721 if (!StringUtils.isEmpty(sharelibMappingFile.trim())) { 722 return SHARELIB_MAPPING_FILE; 723 } 724 return WorkflowAppService.SYSTEM_LIB_PATH; 725 } 726 }); 727 instr.addVariable("libs", "sharelib.mapping.file", new Instrumentation.Variable<String>() { 728 @Override 729 public String getValue() { 730 if (!StringUtils.isEmpty(sharelibMappingFile.trim())) { 731 return sharelibMappingFile; 732 } 733 return "(none)"; 734 } 735 }); 736 instr.addVariable("libs", "sharelib.system.libpath", new Instrumentation.Variable<String>() { 737 @Override 738 public String getValue() { 739 String sharelibPath = "(unavailable)"; 740 try { 741 Path libPath = getLatestLibPath(services.get(WorkflowAppService.class).getSystemLibPath(), 742 SHARE_LIB_PREFIX); 743 if (libPath != null) { 744 sharelibPath = libPath.toUri().toString(); 745 } 746 } 747 catch (IOException ioe) { 748 // ignore exception because we're just doing instrumentation 749 } 750 return sharelibPath; 751 } 752 }); 753 instr.addVariable("libs", "sharelib.mapping.file.timestamp", new Instrumentation.Variable<String>() { 754 @Override 755 public String getValue() { 756 if (!StringUtils.isEmpty(sharelibMetaFileOldTimeStamp)) { 757 return sharelibMetaFileOldTimeStamp; 758 } 759 return "(none)"; 760 } 761 }); 762 instr.addVariable("libs", "sharelib.keys", new Instrumentation.Variable<String>() { 763 @Override 764 public String getValue() { 765 Map<String, List<Path>> shareLib = getShareLib(); 766 if (shareLib != null && !shareLib.isEmpty()) { 767 Set<String> keySet = shareLib.keySet(); 768 return keySet.toString(); 769 } 770 return "(unavailable)"; 771 } 772 }); 773 instr.addVariable("libs", "launcherlib.system.libpath", new Instrumentation.Variable<String>() { 774 @Override 775 public String getValue() { 776 return getLauncherlibPath().toUri().toString(); 777 } 778 }); 779 instr.addVariable("libs", "sharelib.symlink.mapping", new Instrumentation.Variable<String>() { 780 @Override 781 public String getValue() { 782 Map<String, Map<Path, Path>> shareLibSymlinkMapping = getSymlinkMapping(); 783 if (shareLibSymlinkMapping != null && !shareLibSymlinkMapping.isEmpty() 784 && shareLibSymlinkMapping.values() != null && !shareLibSymlinkMapping.values().isEmpty()) { 785 StringBuffer bf = new StringBuffer(); 786 for (Entry<String, Map<Path, Path>> entry : shareLibSymlinkMapping.entrySet()) 787 if (entry.getKey() != null && !entry.getValue().isEmpty()) { 788 for (Path path : entry.getValue().keySet()) { 789 bf.append(path).append("(").append(entry.getKey()).append(")").append("=>") 790 .append(shareLibSymlinkMapping.get(entry.getKey()) != null ? shareLibSymlinkMapping 791 .get(entry.getKey()).get(path) : "").append(","); 792 } 793 } 794 return bf.toString(); 795 } 796 return "(none)"; 797 } 798 }); 799 800 instr.addVariable("libs", "sharelib.cached.config.file", new Instrumentation.Variable<String>() { 801 @Override 802 public String getValue() { 803 Map<String, Map<Path, Configuration>> shareLibConfigMap = getShareLibConfigMap(); 804 if (shareLibConfigMap != null && !shareLibConfigMap.isEmpty()) { 805 StringBuffer bf = new StringBuffer(); 806 807 for (String path : shareLibConfigMap.keySet()) { 808 bf.append(path).append(";"); 809 } 810 return bf.toString(); 811 } 812 return "(none)"; 813 } 814 }); 815 816 } 817 818 /** 819 * Returns file system for shared libraries. 820 * <p> 821 * If WorkflowAppService#getSystemLibPath doesn't have authority then a default one assumed 822 * 823 * @return file system for shared libraries 824 */ 825 public FileSystem getFileSystem() { 826 return fs; 827 } 828 829 /** 830 * Cache XML conf file 831 * 832 * @param hdfsPath the hdfs path 833 * @param shareLibKey the share lib key 834 * @throws IOException Signals that an I/O exception has occurred. 835 * @throws JDOMException 836 */ 837 private void cachePropertyFile(Path qualifiedHdfsPath, Path hdfsPath, String shareLibKey, 838 Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException, JDOMException { 839 Map<Path, Configuration> confMap = shareLibConfigMap.get(shareLibKey); 840 if (confMap == null) { 841 confMap = new HashMap<Path, Configuration>(); 842 shareLibConfigMap.put(shareLibKey, confMap); 843 } 844 Configuration xmlConf = new XConfiguration(fs.open(hdfsPath)); 845 confMap.put(qualifiedHdfsPath, xmlConf); 846 847 } 848 849 private void cacheActionKeySharelibConfList() { 850 ActionService actionService = Services.get().get(ActionService.class); 851 Set<String> actionTypes = actionService.getActionTypes(); 852 for (String key : actionTypes) { 853 ActionExecutor executor = actionService.getExecutor(key); 854 if (executor instanceof JavaActionExecutor) { 855 JavaActionExecutor jexecutor = (JavaActionExecutor) executor; 856 actionConfSet.addAll( 857 new HashSet<String>(Arrays.asList(jexecutor.getShareLibFilesForActionConf() == null ? new String[0] 858 : jexecutor.getShareLibFilesForActionConf()))); 859 } 860 } 861 } 862 863 public Configuration getShareLibConf(String inputKey, Path path) { 864 if (shareLibConfigMap.containsKey(inputKey)) { 865 return shareLibConfigMap.get(inputKey).get(path); 866 } 867 868 return null; 869 } 870 871 @VisibleForTesting 872 public Map<String, Map<Path, Configuration>> getShareLibConfigMap() { 873 return shareLibConfigMap; 874 } 875 876 private boolean isFilePartOfConfList(Path path) throws URISyntaxException { 877 String fragmentName = new URI(path.toString()).getFragment(); 878 String fileName = fragmentName == null ? path.getName() : fragmentName; 879 return actionConfSet.contains(fileName); 880 } 881}