001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.io.File; 022import java.io.IOException; 023import java.net.URI; 024import java.net.URISyntaxException; 025import java.net.URL; 026import java.net.URLDecoder; 027import java.text.MessageFormat; 028import java.text.ParseException; 029import java.text.SimpleDateFormat; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.Calendar; 033import java.util.Comparator; 034import java.util.Date; 035import java.util.Enumeration; 036import java.util.HashMap; 037import java.util.HashSet; 038import java.util.List; 039import java.util.Map; 040import java.util.Properties; 041import java.util.Set; 042import java.util.TimeZone; 043import java.util.Map.Entry; 044import org.apache.commons.lang.StringUtils; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.fs.FileStatus; 047import org.apache.hadoop.fs.FileSystem; 048import org.apache.hadoop.fs.LocalFileSystem; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.fs.PathFilter; 051import org.apache.hadoop.fs.permission.FsPermission; 052import org.apache.oozie.action.ActionExecutor; 053import org.apache.oozie.action.hadoop.JavaActionExecutor; 054import org.apache.oozie.client.rest.JsonUtils; 055import com.google.common.annotations.VisibleForTesting; 056import org.apache.oozie.ErrorCode; 057import org.apache.oozie.util.Instrumentable; 058import org.apache.oozie.util.Instrumentation; 059import org.apache.oozie.util.FSUtils; 060import org.apache.oozie.util.XConfiguration; 061import org.apache.oozie.util.XLog; 062import org.jdom.JDOMException; 063 064import static org.apache.oozie.util.FSUtils.isLocalFile; 065 066public class ShareLibService implements Service, Instrumentable { 067 068 public static final String LAUNCHERJAR_LIB_RETENTION = CONF_PREFIX + "ShareLibService.temp.sharelib.retention.days"; 069 070 public static final String SHARELIB_MAPPING_FILE = CONF_PREFIX + "ShareLibService.mapping.file"; 071 072 public static final String SHIP_LAUNCHER_JAR = "oozie.action.ship.launcher.jar"; 073 074 public static final String PURGE_INTERVAL = CONF_PREFIX + "ShareLibService.purge.interval"; 075 076 public static final String FAIL_FAST_ON_STARTUP = CONF_PREFIX + "ShareLibService.fail.fast.on.startup"; 077 078 private static final String PERMISSION_STRING = "-rwxr-xr-x"; 079 080 public static final String LAUNCHER_LIB_PREFIX = "launcher_"; 081 082 public static final String SHARE_LIB_PREFIX = "lib_"; 083 084 public static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); 085 086 private Services services; 087 088 private Map<String, List<Path>> shareLibMap = new HashMap<String, List<Path>>(); 089 090 private Map<String, Map<Path, Configuration>> shareLibConfigMap = new HashMap<String, Map<Path, Configuration>>(); 091 092 private Map<String, List<Path>> launcherLibMap = new HashMap<String, List<Path>>(); 093 094 private Set<String> actionConfSet = new HashSet<String>(); 095 096 // symlink mapping. Oozie keeps on checking symlink path and if changes, Oozie reloads the sharelib 097 private Map<String, Map<Path, Path>> symlinkMapping = new HashMap<String, Map<Path, Path>>(); 098 099 private static XLog LOG = XLog.getLog(ShareLibService.class); 100 101 private String sharelibMappingFile; 102 103 private boolean isShipLauncherEnabled = false; 104 105 public static String SHARE_LIB_CONF_PREFIX = "oozie"; 106 107 private boolean shareLibLoadAttempted = false; 108 109 private String sharelibMetaFileOldTimeStamp; 110 111 private String sharelibDirOld; 112 113 FileSystem fs; 114 FileSystem localFs; 115 116 final long retentionTime = 1000L * 60 * 60 * 24 * ConfigurationService.getInt(LAUNCHERJAR_LIB_RETENTION); 117 118 @Override 119 public void init(Services services) throws ServiceException { 120 this.services = services; 121 sharelibMappingFile = ConfigurationService.get(services.getConf(), SHARELIB_MAPPING_FILE); 122 isShipLauncherEnabled = ConfigurationService.getBoolean(services.getConf(), SHIP_LAUNCHER_JAR); 123 boolean failOnfailure = ConfigurationService.getBoolean(services.getConf(), FAIL_FAST_ON_STARTUP); 124 Path launcherlibPath = getLauncherlibPath(); 125 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 126 URI uri = launcherlibPath.toUri(); 127 try { 128 129 fs = FileSystem.get(has.createConfiguration(uri.getAuthority())); 130 localFs = LocalFileSystem.get(new Configuration(false)); 131 //cache action key sharelib conf list 132 cacheActionKeySharelibConfList(); 133 updateLauncherLib(); 134 updateShareLib(); 135 } 136 catch (Throwable e) { 137 if (failOnfailure) { 138 LOG.error("Sharelib initialization fails", e); 139 throw new ServiceException(ErrorCode.E0104, getClass().getName(), "Sharelib initialization fails. ", e); 140 } 141 else { 142 // We don't want to actually fail init by throwing an Exception, so only create the ServiceException and 143 // log it 144 ServiceException se = new ServiceException(ErrorCode.E0104, getClass().getName(), 145 "Not able to cache sharelib. An Admin needs to install the sharelib with oozie-setup.sh and issue the " 146 + "'oozie admin' CLI command to update the sharelib", e); 147 LOG.error(se); 148 } 149 } 150 Runnable purgeLibsRunnable = new Runnable() { 151 @Override 152 public void run() { 153 System.out.flush(); 154 try { 155 // Only one server should purge sharelib 156 if (Services.get().get(JobsConcurrencyService.class).isLeader()) { 157 final Date current = Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime(); 158 purgeLibs(fs, LAUNCHER_LIB_PREFIX, current); 159 purgeLibs(fs, SHARE_LIB_PREFIX, current); 160 } 161 } 162 catch (IOException e) { 163 LOG.error("There was an issue purging the sharelib", e); 164 } 165 } 166 }; 167 services.get(SchedulerService.class).schedule(purgeLibsRunnable, 10, 168 ConfigurationService.getInt(services.getConf(), PURGE_INTERVAL) * 60 * 60 * 24, 169 SchedulerService.Unit.SEC); 170 } 171 172 /** 173 * Recursively change permissions. 174 * 175 * @throws IOException Signals that an I/O exception has occurred. 176 */ 177 private void updateLauncherLib() throws IOException { 178 if (isShipLauncherEnabled) { 179 if (fs == null) { 180 Path launcherlibPath = getLauncherlibPath(); 181 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 182 URI uri = launcherlibPath.toUri(); 183 fs = FileSystem.get(has.createConfiguration(uri.getAuthority())); 184 } 185 Path launcherlibPath = getLauncherlibPath(); 186 setupLauncherLibPath(fs, launcherlibPath); 187 recursiveChangePermissions(fs, launcherlibPath, FsPermission.valueOf(PERMISSION_STRING)); 188 } 189 190 } 191 192 /** 193 * Copy launcher jars to Temp directory. 194 * 195 * @param fs the FileSystem 196 * @param tmpLauncherLibPath the tmp launcher lib path 197 * @throws IOException Signals that an I/O exception has occurred. 198 */ 199 private void setupLauncherLibPath(FileSystem fs, Path tmpLauncherLibPath) throws IOException { 200 201 ActionService actionService = Services.get().get(ActionService.class); 202 List<Class<?>> classes = JavaActionExecutor.getCommonLauncherClasses(); 203 Path baseDir = new Path(tmpLauncherLibPath, JavaActionExecutor.OOZIE_COMMON_LIBDIR); 204 copyJarContainingClasses(classes, fs, baseDir, JavaActionExecutor.OOZIE_COMMON_LIBDIR); 205 Set<String> actionTypes = actionService.getActionTypes(); 206 for (String key : actionTypes) { 207 ActionExecutor executor = actionService.getExecutor(key); 208 if (executor instanceof JavaActionExecutor) { 209 JavaActionExecutor jexecutor = (JavaActionExecutor) executor; 210 classes = jexecutor.getLauncherClasses(); 211 if (classes != null) { 212 String type = executor.getType(); 213 Path executorDir = new Path(tmpLauncherLibPath, type); 214 copyJarContainingClasses(classes, fs, executorDir, type); 215 } 216 } 217 } 218 } 219 220 /** 221 * Recursive change permissions. 222 * 223 * @param fs the FileSystem 224 * @param path the Path 225 * @param fsPerm is permission 226 * @throws IOException Signals that an I/O exception has occurred. 227 */ 228 private void recursiveChangePermissions(FileSystem fs, Path path, FsPermission fsPerm) throws IOException { 229 fs.setPermission(path, fsPerm); 230 FileStatus[] filesStatus = fs.listStatus(path); 231 for (int i = 0; i < filesStatus.length; i++) { 232 Path p = filesStatus[i].getPath(); 233 if (filesStatus[i].isDirectory()) { 234 recursiveChangePermissions(fs, p, fsPerm); 235 } 236 else { 237 fs.setPermission(p, fsPerm); 238 } 239 } 240 } 241 242 /** 243 * Copy jar containing classes. 244 * 245 * @param classes the classes 246 * @param fs the FileSystem 247 * @param executorDir is Path 248 * @param type is sharelib key 249 * @throws IOException Signals that an I/O exception has occurred. 250 */ 251 private void copyJarContainingClasses(List<Class<?>> classes, FileSystem fs, Path executorDir, String type) 252 throws IOException { 253 fs.mkdirs(executorDir); 254 Set<String> localJarSet = new HashSet<String>(); 255 for (Class<?> c : classes) { 256 String localJar = findContainingJar(c); 257 if (localJar != null) { 258 localJarSet.add(localJar); 259 } 260 else { 261 throw new IOException("No jar containing " + c + " found"); 262 } 263 } 264 List<Path> listOfPaths = new ArrayList<Path>(); 265 for (String localJarStr : localJarSet) { 266 File localJar = new File(localJarStr); 267 fs.copyFromLocalFile(new Path(localJar.getPath()), executorDir); 268 Path path = new Path(executorDir, localJar.getName()); 269 listOfPaths.add(path); 270 LOG.info(localJar.getName() + " uploaded to " + executorDir.toString()); 271 } 272 launcherLibMap.put(type, listOfPaths); 273 274 } 275 276 /** 277 * Gets the path recursively. 278 * 279 * @param fs the FileSystem 280 * @param rootDir the root directory 281 * @param listOfPaths the list of paths 282 * @param shareLibKey the share lib key 283 * @return the path recursively 284 * @throws IOException Signals that an I/O exception has occurred. 285 */ 286 private void getPathRecursively(FileSystem fs, Path rootDir, List<Path> listOfPaths, String shareLibKey, 287 Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException { 288 if (rootDir == null) { 289 return; 290 } 291 292 try { 293 if (fs.isFile(new Path(new URI(rootDir.toString()).getPath()))) { 294 Path filePath = new Path(new URI(rootDir.toString()).getPath()); 295 Path qualifiedRootDirPath = fs.makeQualified(rootDir); 296 if (isFilePartOfConfList(rootDir)) { 297 cachePropertyFile(qualifiedRootDirPath, filePath, shareLibKey, shareLibConfigMap); 298 } 299 listOfPaths.add(qualifiedRootDirPath); 300 return; 301 } 302 303 FileStatus[] status = fs.listStatus(rootDir); 304 if (status == null) { 305 LOG.info("Shared lib " + rootDir + " doesn't exist, not adding to cache"); 306 return; 307 } 308 309 for (FileStatus file : status) { 310 if (file.isDirectory()) { 311 getPathRecursively(fs, file.getPath(), listOfPaths, shareLibKey, shareLibConfigMap); 312 } 313 else { 314 if (isFilePartOfConfList(file.getPath())) { 315 cachePropertyFile(file.getPath(), file.getPath(), shareLibKey, shareLibConfigMap); 316 } 317 listOfPaths.add(file.getPath()); 318 } 319 } 320 } 321 catch (URISyntaxException e) { 322 throw new IOException(e); 323 } 324 catch (JDOMException e) { 325 throw new IOException(e); 326 } 327 } 328 329 public Map<String, List<Path>> getShareLib() { 330 return shareLibMap; 331 } 332 333 private Map<String, Map<Path, Path>> getSymlinkMapping() { 334 return symlinkMapping; 335 } 336 337 /** 338 * Gets the action sharelib lib jars. 339 * 340 * @param shareLibKey the sharelib key 341 * @return List of paths 342 * @throws IOException Signals that an I/O exception has occurred. 343 */ 344 public List<Path> getShareLibJars(String shareLibKey) throws IOException { 345 // Sharelib map is empty means that on previous or startup attempt of 346 // caching sharelib has failed.Trying to reload 347 if (shareLibMap.isEmpty() && !shareLibLoadAttempted) { 348 synchronized (ShareLibService.class) { 349 if (shareLibMap.isEmpty()) { 350 updateShareLib(); 351 shareLibLoadAttempted = true; 352 } 353 } 354 } 355 checkSymlink(shareLibKey); 356 return shareLibMap.get(shareLibKey); 357 } 358 359 private void checkSymlink(final String shareLibKey) throws IOException { 360 if (symlinkMapping.get(shareLibKey) == null || symlinkMapping.get(shareLibKey).isEmpty()) { 361 return; 362 } 363 364 for (final Path symlinkPath : symlinkMapping.get(shareLibKey).keySet()) { 365 final FileSystem fileSystem = getHostFileSystem(symlinkPath); 366 final Path symLinkTarget = FSUtils.getSymLinkTarget(fileSystem, symlinkPath); 367 final boolean symlinkIsNotTarget = !getSymlinkSharelibPath(shareLibKey, symlinkPath).equals(symLinkTarget); 368 if (symlinkIsNotTarget) { 369 synchronized (ShareLibService.class) { 370 final Map<String, List<Path>> tmpShareLibMap = new HashMap<String, List<Path>>(shareLibMap); 371 372 final Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<>(shareLibConfigMap); 373 374 final Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>( 375 symlinkMapping); 376 377 LOG.info(MessageFormat.format("Symlink target for [{0}] has changed, was [{1}], now [{2}]", 378 shareLibKey, symlinkPath, symLinkTarget)); 379 loadShareLibMetaFile(tmpShareLibMap, tmpSymlinkMapping, tmpShareLibConfigMap, sharelibMappingFile, 380 shareLibKey); 381 shareLibMap = tmpShareLibMap; 382 symlinkMapping = tmpSymlinkMapping; 383 shareLibConfigMap = tmpShareLibConfigMap; 384 return; 385 } 386 } 387 } 388 } 389 390 private Path getSymlinkSharelibPath(String shareLibKey, Path path) { 391 return symlinkMapping.get(shareLibKey).get(path); 392 } 393 394 private FileSystem getHostFileSystem(String pathStr) { 395 FileSystem fileSystem; 396 if (isLocalFile(pathStr)) { 397 fileSystem = localFs; 398 } 399 else { 400 fileSystem = fs; 401 } 402 return fileSystem; 403 } 404 405 private FileSystem getHostFileSystem(Path path) { 406 return getHostFileSystem(path.toString()); 407 } 408 409 /** 410 * Gets the launcher jars. 411 * 412 * @param shareLibKey the shareLib key 413 * @return launcher jars paths 414 * @throws IOException Signals that an I/O exception has occurred. 415 */ 416 public List<Path> getSystemLibJars(String shareLibKey) throws IOException { 417 List<Path> returnList = new ArrayList<Path>(); 418 // Sharelib map is empty means that on previous or startup attempt of 419 // caching launcher jars has failed.Trying to reload 420 if (isShipLauncherEnabled) { 421 if (launcherLibMap.isEmpty()) { 422 synchronized (ShareLibService.class) { 423 if (launcherLibMap.isEmpty()) { 424 updateLauncherLib(); 425 } 426 } 427 } 428 if (launcherLibMap.get(shareLibKey) != null) { 429 returnList.addAll(launcherLibMap.get(shareLibKey)); 430 } 431 } 432 if (shareLibKey.equals(JavaActionExecutor.OOZIE_COMMON_LIBDIR)) { 433 List<Path> sharelibList = getShareLibJars(shareLibKey); 434 if (sharelibList != null) { 435 returnList.addAll(sharelibList); 436 } 437 } 438 return returnList; 439 } 440 441 /** 442 * Find containing jar containing. 443 * 444 * @param clazz the clazz 445 * @return the string 446 */ 447 @VisibleForTesting 448 protected String findContainingJar(Class<?> clazz) { 449 ClassLoader loader = clazz.getClassLoader(); 450 String classFile = clazz.getName().replaceAll("\\.", "/") + ".class"; 451 try { 452 for (Enumeration<URL> itr = loader.getResources(classFile); itr.hasMoreElements();) { 453 URL url = itr.nextElement(); 454 if ("jar".equals(url.getProtocol())) { 455 String toReturn = url.getPath(); 456 if (toReturn.startsWith("file:")) { 457 toReturn = toReturn.substring("file:".length()); 458 // URLDecoder is a misnamed class, since it actually 459 // decodes 460 // x-www-form-urlencoded MIME type rather than actual 461 // URL encoding (which the file path has). Therefore it 462 // would 463 // decode +s to ' 's which is incorrect (spaces are 464 // actually 465 // either unencoded or encoded as "%20"). Replace +s 466 // first, so 467 // that they are kept sacred during the decoding 468 // process. 469 toReturn = toReturn.replaceAll("\\+", "%2B"); 470 toReturn = URLDecoder.decode(toReturn, "UTF-8"); 471 toReturn = toReturn.replaceAll("!.*$", ""); 472 return toReturn; 473 } 474 } 475 } 476 } 477 catch (IOException ioe) { 478 throw new RuntimeException(ioe); 479 } 480 return null; 481 } 482 483 /** 484 * Purge libs. 485 * 486 * @param fs the fs 487 * @param prefix the prefix 488 * @param current the current time 489 * @throws IOException Signals that an I/O exception has occurred. 490 */ 491 private void purgeLibs(FileSystem fs, final String prefix, final Date current) throws IOException { 492 Path executorLibBasePath = services.get(WorkflowAppService.class).getSystemLibPath(); 493 PathFilter directoryFilter = new PathFilter() { 494 @Override 495 public boolean accept(Path path) { 496 if (path.getName().startsWith(prefix)) { 497 String name = path.getName(); 498 String time = name.substring(prefix.length()); 499 Date d = null; 500 try { 501 d = dateFormat.parse(time); 502 } 503 catch (ParseException e) { 504 return false; 505 } 506 return (current.getTime() - d.getTime()) > retentionTime; 507 } 508 else { 509 return false; 510 } 511 } 512 }; 513 FileStatus[] dirList = fs.listStatus(executorLibBasePath, directoryFilter); 514 Arrays.sort(dirList, new Comparator<FileStatus>() { 515 // sort in desc order 516 @Override 517 public int compare(FileStatus o1, FileStatus o2) { 518 return o2.getPath().getName().compareTo(o1.getPath().getName()); 519 } 520 }); 521 522 // Logic is to keep all share-lib between current timestamp and 7days old + 1 latest sharelib older than 7 days. 523 // refer OOZIE-1761 524 for (int i = 1; i < dirList.length; i++) { 525 Path dirPath = dirList[i].getPath(); 526 fs.delete(dirPath, true); 527 LOG.info("Deleted old launcher jar lib directory {0}", dirPath.getName()); 528 } 529 } 530 531 @Override 532 public void destroy() { 533 shareLibMap.clear(); 534 launcherLibMap.clear(); 535 } 536 537 @Override 538 public Class<? extends Service> getInterface() { 539 return ShareLibService.class; 540 } 541 542 /** 543 * Update share lib cache. 544 * 545 * @return the map 546 * @throws IOException Signals that an I/O exception has occurred. 547 */ 548 public Map<String, String> updateShareLib() throws IOException { 549 Map<String, String> status = new HashMap<String, String>(); 550 551 if (fs == null) { 552 Path launcherlibPath = getLauncherlibPath(); 553 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 554 URI uri = launcherlibPath.toUri(); 555 fs = FileSystem.get(has.createConfiguration(uri.getAuthority())); 556 } 557 558 Map<String, List<Path>> tempShareLibMap = new HashMap<String, List<Path>>(); 559 Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>(); 560 Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<String, Map<Path, Configuration>>(); 561 562 String trimmedSharelibMappingFile = sharelibMappingFile.trim(); 563 if (!StringUtils.isEmpty(trimmedSharelibMappingFile)) { 564 FileSystem fileSystem = getHostFileSystem(trimmedSharelibMappingFile); 565 566 String sharelibMetaFileNewTimeStamp = JsonUtils.formatDateRfc822( 567 new Date(fileSystem.getFileStatus(new Path(sharelibMappingFile)).getModificationTime()), "GMT"); 568 569 loadShareLibMetaFile(tempShareLibMap, tmpSymlinkMapping, tmpShareLibConfigMap, sharelibMappingFile, null); 570 status.put("sharelibMetaFile", sharelibMappingFile); 571 status.put("sharelibMetaFileNewTimeStamp", sharelibMetaFileNewTimeStamp); 572 status.put("sharelibMetaFileOldTimeStamp", sharelibMetaFileOldTimeStamp); 573 sharelibMetaFileOldTimeStamp = sharelibMetaFileNewTimeStamp; 574 } 575 else { 576 Path shareLibpath = getLatestLibPath(services.get(WorkflowAppService.class).getSystemLibPath(), 577 SHARE_LIB_PREFIX); 578 loadShareLibfromDFS(tempShareLibMap, shareLibpath, tmpShareLibConfigMap); 579 580 if (shareLibpath != null) { 581 status.put("sharelibDirNew", shareLibpath.toString()); 582 status.put("sharelibDirOld", sharelibDirOld); 583 sharelibDirOld = shareLibpath.toString(); 584 } 585 586 } 587 shareLibMap = tempShareLibMap; 588 symlinkMapping = tmpSymlinkMapping; 589 shareLibConfigMap = tmpShareLibConfigMap; 590 return status; 591 } 592 593 /** 594 * Update share lib cache. Parse the share lib directory and each sub directory is a action key 595 * 596 * @param shareLibMap the share lib jar map 597 * @param shareLibpath the share libpath 598 * @throws IOException Signals that an I/O exception has occurred. 599 */ 600 private void loadShareLibfromDFS(Map<String, List<Path>> shareLibMap, Path shareLibpath, 601 Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException { 602 603 if (shareLibpath == null) { 604 LOG.info("No share lib directory found"); 605 return; 606 607 } 608 609 FileStatus[] dirList = fs.listStatus(shareLibpath); 610 611 if (dirList == null) { 612 return; 613 } 614 615 for (FileStatus dir : dirList) { 616 if (!dir.isDirectory()) { 617 continue; 618 } 619 List<Path> listOfPaths = new ArrayList<Path>(); 620 getPathRecursively(fs, dir.getPath(), listOfPaths, dir.getPath().getName(), shareLibConfigMap); 621 shareLibMap.put(dir.getPath().getName(), listOfPaths); 622 LOG.info("Share lib for " + dir.getPath().getName() + ":" + listOfPaths); 623 624 } 625 626 } 627 628 /** 629 * Load share lib text file. Sharelib mapping files contains list of key=value. where key is the action key and 630 * value is the DFS location of sharelib files. 631 * 632 * @param shareLibMap the share lib jar map 633 * @param symlinkMapping the symlink mapping 634 * @param sharelibFileMapping the sharelib file mapping 635 * @param shareLibKey the share lib key 636 * @throws IOException Signals that an I/O exception has occurred. 637 * @parm shareLibKey the sharelib key 638 */ 639 private void loadShareLibMetaFile(Map<String, List<Path>> shareLibMap, Map<String, Map<Path, Path>> symlinkMapping, 640 Map<String, Map<Path, Configuration>> shareLibConfigMap, String sharelibFileMapping, String shareLibKey) 641 throws IOException { 642 643 Path shareFileMappingPath = new Path(sharelibFileMapping); 644 FileSystem filesystem = getHostFileSystem(shareFileMappingPath); 645 646 Properties prop = new Properties(); 647 prop.load(filesystem.open(new Path(sharelibFileMapping))); 648 649 for (Object keyObject : prop.keySet()) { 650 String key = (String) keyObject; 651 String mapKey = key.substring(SHARE_LIB_CONF_PREFIX.length() + 1); 652 if (key.toLowerCase().startsWith(SHARE_LIB_CONF_PREFIX) 653 && (shareLibKey == null || shareLibKey.equals(mapKey))) { 654 loadSharelib(shareLibMap, symlinkMapping, shareLibConfigMap, mapKey, 655 ((String) prop.get(key)).split(",")); 656 } 657 } 658 } 659 660 private void loadSharelib(Map<String, List<Path>> tmpShareLibMap, Map<String, Map<Path, Path>> tmpSymlinkMapping, 661 Map<String, Map<Path, Configuration>> shareLibConfigMap, String shareLibKey, String pathList[]) 662 throws IOException { 663 List<Path> listOfPaths = new ArrayList<Path>(); 664 Map<Path, Path> symlinkMappingforAction = new HashMap<Path, Path>(); 665 666 for (String pathStr : pathList) { 667 Path path = new Path(pathStr); 668 final FileSystem fileSystem = getHostFileSystem(pathStr); 669 670 getPathRecursively(fileSystem, path, listOfPaths, shareLibKey, shareLibConfigMap); 671 if (FSUtils.isSymlink(fileSystem, path)) { 672 symlinkMappingforAction.put(path, FSUtils.getSymLinkTarget(fileSystem, path)); 673 } 674 } 675 676 LOG.info("symlink for " + shareLibKey + ":" + symlinkMappingforAction); 677 tmpSymlinkMapping.put(shareLibKey, symlinkMappingforAction); 678 679 tmpShareLibMap.put(shareLibKey, listOfPaths); 680 LOG.info("Share lib for " + shareLibKey + ":" + listOfPaths); 681 } 682 683 /** 684 * Gets the launcherlib path. 685 * 686 * @return the launcherlib path 687 */ 688 private Path getLauncherlibPath() { 689 String formattedDate = dateFormat.format(Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime()); 690 Path tmpLauncherLibPath = new Path(services.get(WorkflowAppService.class).getSystemLibPath(), LAUNCHER_LIB_PREFIX 691 + formattedDate); 692 return tmpLauncherLibPath; 693 } 694 695 /** 696 * Gets the Latest lib path. 697 * 698 * @param rootDir the root dir 699 * @param prefix the prefix 700 * @return latest lib path 701 * @throws IOException Signals that an I/O exception has occurred. 702 */ 703 public Path getLatestLibPath(Path rootDir, final String prefix) throws IOException { 704 Date max = new Date(0L); 705 Path path = null; 706 PathFilter directoryFilter = new PathFilter() { 707 @Override 708 public boolean accept(Path path) { 709 return path.getName().startsWith(prefix); 710 } 711 }; 712 713 FileStatus[] files = fs.listStatus(rootDir, directoryFilter); 714 for (FileStatus file : files) { 715 String name = file.getPath().getName().toString(); 716 String time = name.substring(prefix.length()); 717 Date d = null; 718 try { 719 d = dateFormat.parse(time); 720 } 721 catch (ParseException e) { 722 continue; 723 } 724 if (d.compareTo(max) > 0) { 725 path = file.getPath(); 726 max = d; 727 } 728 } 729 // If there are no timestamped directories, fall back to root directory 730 if (path == null) { 731 path = rootDir; 732 } 733 return path; 734 } 735 736 /** 737 * Instruments the log service. 738 * <p> 739 * It sets instrumentation variables indicating the location of the sharelib and launcherlib 740 * 741 * @param instr instrumentation to use. 742 */ 743 @Override 744 public void instrument(Instrumentation instr) { 745 instr.addVariable("libs", "sharelib.source", new Instrumentation.Variable<String>() { 746 @Override 747 public String getValue() { 748 if (!StringUtils.isEmpty(sharelibMappingFile.trim())) { 749 return SHARELIB_MAPPING_FILE; 750 } 751 return WorkflowAppService.SYSTEM_LIB_PATH; 752 } 753 }); 754 instr.addVariable("libs", "sharelib.mapping.file", new Instrumentation.Variable<String>() { 755 @Override 756 public String getValue() { 757 if (!StringUtils.isEmpty(sharelibMappingFile.trim())) { 758 return sharelibMappingFile; 759 } 760 return "(none)"; 761 } 762 }); 763 instr.addVariable("libs", "sharelib.system.libpath", new Instrumentation.Variable<String>() { 764 @Override 765 public String getValue() { 766 String sharelibPath = "(unavailable)"; 767 try { 768 Path libPath = getLatestLibPath(services.get(WorkflowAppService.class).getSystemLibPath(), 769 SHARE_LIB_PREFIX); 770 if (libPath != null) { 771 sharelibPath = libPath.toUri().toString(); 772 } 773 } 774 catch (IOException ioe) { 775 // ignore exception because we're just doing instrumentation 776 } 777 return sharelibPath; 778 } 779 }); 780 instr.addVariable("libs", "sharelib.mapping.file.timestamp", new Instrumentation.Variable<String>() { 781 @Override 782 public String getValue() { 783 if (!StringUtils.isEmpty(sharelibMetaFileOldTimeStamp)) { 784 return sharelibMetaFileOldTimeStamp; 785 } 786 return "(none)"; 787 } 788 }); 789 instr.addVariable("libs", "sharelib.keys", new Instrumentation.Variable<String>() { 790 @Override 791 public String getValue() { 792 Map<String, List<Path>> shareLib = getShareLib(); 793 if (shareLib != null && !shareLib.isEmpty()) { 794 Set<String> keySet = shareLib.keySet(); 795 return keySet.toString(); 796 } 797 return "(unavailable)"; 798 } 799 }); 800 instr.addVariable("libs", "launcherlib.system.libpath", new Instrumentation.Variable<String>() { 801 @Override 802 public String getValue() { 803 return getLauncherlibPath().toUri().toString(); 804 } 805 }); 806 instr.addVariable("libs", "sharelib.symlink.mapping", new Instrumentation.Variable<String>() { 807 @Override 808 public String getValue() { 809 Map<String, Map<Path, Path>> shareLibSymlinkMapping = getSymlinkMapping(); 810 if (shareLibSymlinkMapping != null && !shareLibSymlinkMapping.isEmpty() 811 && shareLibSymlinkMapping.values() != null && !shareLibSymlinkMapping.values().isEmpty()) { 812 StringBuffer bf = new StringBuffer(); 813 for (Entry<String, Map<Path, Path>> entry : shareLibSymlinkMapping.entrySet()) 814 if (entry.getKey() != null && !entry.getValue().isEmpty()) { 815 for (Path path : entry.getValue().keySet()) { 816 bf.append(path).append("(").append(entry.getKey()).append(")").append("=>") 817 .append(shareLibSymlinkMapping.get(entry.getKey()) != null ? shareLibSymlinkMapping 818 .get(entry.getKey()).get(path) : "").append(","); 819 } 820 } 821 return bf.toString(); 822 } 823 return "(none)"; 824 } 825 }); 826 827 instr.addVariable("libs", "sharelib.cached.config.file", new Instrumentation.Variable<String>() { 828 @Override 829 public String getValue() { 830 Map<String, Map<Path, Configuration>> shareLibConfigMap = getShareLibConfigMap(); 831 if (shareLibConfigMap != null && !shareLibConfigMap.isEmpty()) { 832 StringBuffer bf = new StringBuffer(); 833 834 for (String path : shareLibConfigMap.keySet()) { 835 bf.append(path).append(";"); 836 } 837 return bf.toString(); 838 } 839 return "(none)"; 840 } 841 }); 842 843 } 844 845 /** 846 * Returns file system for shared libraries. 847 * <p> 848 * If WorkflowAppService#getSystemLibPath doesn't have authority then a default one assumed 849 * 850 * @return file system for shared libraries 851 */ 852 public FileSystem getFileSystem() { 853 return fs; 854 } 855 856 /** 857 * Cache XML conf file 858 * 859 * @param propertyFilePath the path of the property file 860 * @param shareLibKey the share lib key 861 * @throws IOException Signals that an I/O exception has occurred. 862 * @throws JDOMException 863 */ 864 private void cachePropertyFile(Path qualifiedHdfsPath, Path propertyFilePath, String shareLibKey, 865 Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException, JDOMException { 866 Map<Path, Configuration> confMap = shareLibConfigMap.get(shareLibKey); 867 if (confMap == null) { 868 confMap = new HashMap<Path, Configuration>(); 869 shareLibConfigMap.put(shareLibKey, confMap); 870 } 871 FileSystem fileSystem = getHostFileSystem(propertyFilePath); 872 Configuration xmlConf = new XConfiguration(fileSystem.open(propertyFilePath)); 873 confMap.put(qualifiedHdfsPath, xmlConf); 874 } 875 876 private void cacheActionKeySharelibConfList() { 877 ActionService actionService = Services.get().get(ActionService.class); 878 Set<String> actionTypes = actionService.getActionTypes(); 879 for (String key : actionTypes) { 880 ActionExecutor executor = actionService.getExecutor(key); 881 if (executor instanceof JavaActionExecutor) { 882 JavaActionExecutor jexecutor = (JavaActionExecutor) executor; 883 actionConfSet.addAll( 884 new HashSet<String>(Arrays.asList(jexecutor.getShareLibFilesForActionConf() == null ? new String[0] 885 : jexecutor.getShareLibFilesForActionConf()))); 886 } 887 } 888 } 889 890 public Configuration getShareLibConf(String inputKey, Path path) { 891 if (shareLibConfigMap.containsKey(inputKey)) { 892 return shareLibConfigMap.get(inputKey).get(path); 893 } 894 895 return null; 896 } 897 898 @VisibleForTesting 899 public Map<String, Map<Path, Configuration>> getShareLibConfigMap() { 900 return shareLibConfigMap; 901 } 902 903 private boolean isFilePartOfConfList(Path path) throws URISyntaxException { 904 String fragmentName = new URI(path.toString()).getFragment(); 905 String fileName = fragmentName == null ? path.getName() : fragmentName; 906 return actionConfSet.contains(fileName); 907 } 908}