001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.io.File;
022import java.io.FileInputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.OutputStream;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.net.URL;
029import java.net.URLDecoder;
030import java.text.MessageFormat;
031import java.text.ParseException;
032import java.text.SimpleDateFormat;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Calendar;
036import java.util.Comparator;
037import java.util.Date;
038import java.util.Enumeration;
039import java.util.HashMap;
040import java.util.HashSet;
041import java.util.List;
042import java.util.Map;
043import java.util.Properties;
044import java.util.Set;
045import java.util.TimeZone;
046import java.util.Map.Entry;
047import org.apache.commons.lang.StringUtils;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.fs.FileStatus;
050import org.apache.hadoop.fs.FileSystem;
051import org.apache.hadoop.fs.Path;
052import org.apache.hadoop.fs.PathFilter;
053import org.apache.hadoop.fs.permission.FsPermission;
054import org.apache.hadoop.io.IOUtils;
055import org.apache.oozie.action.ActionExecutor;
056import org.apache.oozie.action.hadoop.JavaActionExecutor;
057import org.apache.oozie.client.rest.JsonUtils;
058import org.apache.oozie.hadoop.utils.HadoopShims;
059import org.apache.oozie.util.Instrumentable;
060import org.apache.oozie.util.Instrumentation;
061import org.apache.oozie.util.XConfiguration;
062import org.apache.oozie.util.XLog;
063import com.google.common.annotations.VisibleForTesting;
064
065import org.apache.oozie.ErrorCode;
066import org.jdom.JDOMException;
067
068public class ShareLibService implements Service, Instrumentable {
069
070    public static final String LAUNCHERJAR_LIB_RETENTION = CONF_PREFIX + "ShareLibService.temp.sharelib.retention.days";
071
072    public static final String SHARELIB_MAPPING_FILE = CONF_PREFIX + "ShareLibService.mapping.file";
073
074    public static final String SHIP_LAUNCHER_JAR = "oozie.action.ship.launcher.jar";
075
076    public static final String PURGE_INTERVAL = CONF_PREFIX + "ShareLibService.purge.interval";
077
078    public static final String FAIL_FAST_ON_STARTUP = CONF_PREFIX + "ShareLibService.fail.fast.on.startup";
079
080    private static final String PERMISSION_STRING = "-rwxr-xr-x";
081
082    public static final String LAUNCHER_LIB_PREFIX = "launcher_";
083
084    public static final String SHARE_LIB_PREFIX = "lib_";
085
086    public static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
087
088    private Services services;
089
090    private Map<String, List<Path>> shareLibMap = new HashMap<String, List<Path>>();
091
092    private Map<String, Map<Path, Configuration>> shareLibConfigMap = new HashMap<String, Map<Path, Configuration>>();
093
094    private Map<String, List<Path>> launcherLibMap = new HashMap<String, List<Path>>();
095
096    private Set<String> actionConfSet = new HashSet<String>();
097
098    // symlink mapping. Oozie keeps on checking symlink path and if changes, Oozie reloads the sharelib
099    private Map<String, Map<Path, Path>> symlinkMapping = new HashMap<String, Map<Path, Path>>();
100
101    private static XLog LOG = XLog.getLog(ShareLibService.class);
102
103    private String sharelibMappingFile;
104
105    private boolean isShipLauncherEnabled = false;
106
107    public static String SHARE_LIB_CONF_PREFIX = "oozie";
108
109    private boolean shareLibLoadAttempted = false;
110
111    private String sharelibMetaFileOldTimeStamp;
112
113    private String sharelibDirOld;
114
115    FileSystem fs;
116
117    final long retentionTime = 1000 * 60 * 60 * 24 * ConfigurationService.getInt(LAUNCHERJAR_LIB_RETENTION);
118
119    @Override
120    public void init(Services services) throws ServiceException {
121        this.services = services;
122        sharelibMappingFile = ConfigurationService.get(services.getConf(), SHARELIB_MAPPING_FILE);
123        isShipLauncherEnabled = ConfigurationService.getBoolean(services.getConf(), SHIP_LAUNCHER_JAR);
124        boolean failOnfailure = ConfigurationService.getBoolean(services.getConf(), FAIL_FAST_ON_STARTUP);
125        Path launcherlibPath = getLauncherlibPath();
126        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
127        URI uri = launcherlibPath.toUri();
128        try {
129            fs = FileSystem.get(has.createJobConf(uri.getAuthority()));
130            //cache action key sharelib conf list
131            cacheActionKeySharelibConfList();
132            updateLauncherLib();
133            updateShareLib();
134        }
135        catch (Throwable e) {
136            if (failOnfailure) {
137                LOG.error("Sharelib initialization fails", e);
138                throw new ServiceException(ErrorCode.E0104, getClass().getName(), "Sharelib initialization fails. ", e);
139            }
140            else {
141                // We don't want to actually fail init by throwing an Exception, so only create the ServiceException and
142                // log it
143                ServiceException se = new ServiceException(ErrorCode.E0104, getClass().getName(),
144                        "Not able to cache sharelib. An Admin needs to install the sharelib with oozie-setup.sh and issue the "
145                                + "'oozie admin' CLI command to update the sharelib", e);
146                LOG.error(se);
147            }
148        }
149        Runnable purgeLibsRunnable = new Runnable() {
150            @Override
151            public void run() {
152                System.out.flush();
153                try {
154                    // Only one server should purge sharelib
155                    if (Services.get().get(JobsConcurrencyService.class).isLeader()) {
156                        final Date current = Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime();
157                        purgeLibs(fs, LAUNCHER_LIB_PREFIX, current);
158                        purgeLibs(fs, SHARE_LIB_PREFIX, current);
159                    }
160                }
161                catch (IOException e) {
162                    LOG.error("There was an issue purging the sharelib", e);
163                }
164            }
165        };
166        services.get(SchedulerService.class).schedule(purgeLibsRunnable, 10,
167                ConfigurationService.getInt(services.getConf(), PURGE_INTERVAL) * 60 * 60 * 24,
168                SchedulerService.Unit.SEC);
169    }
170
171    /**
172     * Recursively change permissions.
173     *
174     * @throws IOException Signals that an I/O exception has occurred.
175     */
176    private void updateLauncherLib() throws IOException {
177        if (isShipLauncherEnabled) {
178            if (fs == null) {
179                Path launcherlibPath = getLauncherlibPath();
180                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
181                URI uri = launcherlibPath.toUri();
182                fs = FileSystem.get(has.createJobConf(uri.getAuthority()));
183            }
184            Path launcherlibPath = getLauncherlibPath();
185            setupLauncherLibPath(fs, launcherlibPath);
186            recursiveChangePermissions(fs, launcherlibPath, FsPermission.valueOf(PERMISSION_STRING));
187        }
188
189    }
190
191    /**
192     * Copy launcher jars to Temp directory.
193     *
194     * @param fs the FileSystem
195     * @param tmpLauncherLibPath the tmp launcher lib path
196     * @throws IOException Signals that an I/O exception has occurred.
197     */
198    private void setupLauncherLibPath(FileSystem fs, Path tmpLauncherLibPath) throws IOException {
199
200        ActionService actionService = Services.get().get(ActionService.class);
201        List<Class> classes = JavaActionExecutor.getCommonLauncherClasses();
202        Path baseDir = new Path(tmpLauncherLibPath, JavaActionExecutor.OOZIE_COMMON_LIBDIR);
203        copyJarContainingClasses(classes, fs, baseDir, JavaActionExecutor.OOZIE_COMMON_LIBDIR);
204        Set<String> actionTypes = actionService.getActionTypes();
205        for (String key : actionTypes) {
206            ActionExecutor executor = actionService.getExecutor(key);
207            if (executor instanceof JavaActionExecutor) {
208                JavaActionExecutor jexecutor = (JavaActionExecutor) executor;
209                classes = jexecutor.getLauncherClasses();
210                if (classes != null) {
211                    String type = executor.getType();
212                    Path executorDir = new Path(tmpLauncherLibPath, type);
213                    copyJarContainingClasses(classes, fs, executorDir, type);
214                }
215            }
216        }
217    }
218
219    /**
220     * Recursive change permissions.
221     *
222     * @param fs the FileSystem
223     * @param path the Path
224     * @param perm is permission
225     * @throws IOException Signals that an I/O exception has occurred.
226     */
227    private void recursiveChangePermissions(FileSystem fs, Path path, FsPermission fsPerm) throws IOException {
228        fs.setPermission(path, fsPerm);
229        FileStatus[] filesStatus = fs.listStatus(path);
230        for (int i = 0; i < filesStatus.length; i++) {
231            Path p = filesStatus[i].getPath();
232            if (filesStatus[i].isDir()) {
233                recursiveChangePermissions(fs, p, fsPerm);
234            }
235            else {
236                fs.setPermission(p, fsPerm);
237            }
238        }
239    }
240
241    /**
242     * Copy jar containing classes.
243     *
244     * @param classes the classes
245     * @param fs the FileSystem
246     * @param executorDir is Path
247     * @param type is sharelib key
248     * @throws IOException Signals that an I/O exception has occurred.
249     */
250    private void copyJarContainingClasses(List<Class> classes, FileSystem fs, Path executorDir, String type)
251            throws IOException {
252        fs.mkdirs(executorDir);
253        Set<String> localJarSet = new HashSet<String>();
254        for (Class c : classes) {
255            String localJar = findContainingJar(c);
256            if (localJar != null) {
257                localJarSet.add(localJar);
258            }
259            else {
260                throw new IOException("No jar containing " + c + " found");
261            }
262        }
263        List<Path> listOfPaths = new ArrayList<Path>();
264        for (String localJarStr : localJarSet) {
265            File localJar = new File(localJarStr);
266            copyFromLocalFile(localJar, fs, executorDir);
267            Path path = new Path(executorDir, localJar.getName());
268            listOfPaths.add(path);
269            LOG.info(localJar.getName() + " uploaded to " + executorDir.toString());
270        }
271        launcherLibMap.put(type, listOfPaths);
272
273    }
274
275    private static boolean copyFromLocalFile(File src, FileSystem dstFS, Path dstDir) throws IOException {
276      Path dst = new Path(dstDir, src.getName());
277      InputStream in=null;
278      OutputStream out = null;
279      try {
280        in = new FileInputStream(src);
281        out = dstFS.create(dst, true);
282        IOUtils.copyBytes(in, out, dstFS.getConf(), true);
283      } catch (IOException e) {
284        IOUtils.closeStream(out);
285        IOUtils.closeStream(in);
286        throw e;
287      }
288      return true;
289
290    }
291
292    /**
293     * Gets the path recursively.
294     *
295     * @param fs the FileSystem
296     * @param rootDir the root directory
297     * @param listOfPaths the list of paths
298     * @param shareLibKey the share lib key
299     * @return the path recursively
300     * @throws IOException Signals that an I/O exception has occurred.
301     */
302    private void getPathRecursively(FileSystem fs, Path rootDir, List<Path> listOfPaths, String shareLibKey,
303            Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException {
304        if (rootDir == null) {
305            return;
306        }
307
308        try {
309            if (fs.isFile(new Path(new URI(rootDir.toString()).getPath()))) {
310                Path filePath = new Path(new URI(rootDir.toString()).getPath());
311                Path qualifiedRootDirPath = fs.makeQualified(rootDir);
312                if (isFilePartOfConfList(rootDir)) {
313                    cachePropertyFile(qualifiedRootDirPath, filePath, shareLibKey, shareLibConfigMap);
314                }
315                listOfPaths.add(qualifiedRootDirPath);
316                return;
317            }
318
319            FileStatus[] status = fs.listStatus(rootDir);
320            if (status == null) {
321                LOG.info("Shared lib " + rootDir + " doesn't exist, not adding to cache");
322                return;
323            }
324
325            for (FileStatus file : status) {
326                if (file.isDir()) {
327                    getPathRecursively(fs, file.getPath(), listOfPaths, shareLibKey, shareLibConfigMap);
328                }
329                else {
330                    if (isFilePartOfConfList(file.getPath())) {
331                        cachePropertyFile(file.getPath(), file.getPath(), shareLibKey, shareLibConfigMap);
332                    }
333                    listOfPaths.add(file.getPath());
334                }
335            }
336        }
337        catch (URISyntaxException e) {
338            throw new IOException(e);
339        }
340        catch (JDOMException e) {
341            throw new IOException(e);
342        }
343    }
344
345    public Map<String, List<Path>> getShareLib() {
346        return shareLibMap;
347    }
348
349    private Map<String, Map<Path, Path>> getSymlinkMapping() {
350        return symlinkMapping;
351    }
352
353    /**
354     * Gets the action sharelib lib jars.
355     *
356     * @param shareLibKey the sharelib key
357     * @return List of paths
358     * @throws IOException Signals that an I/O exception has occurred.
359     */
360    public List<Path> getShareLibJars(String shareLibKey) throws IOException {
361        // Sharelib map is empty means that on previous or startup attempt of
362        // caching sharelib has failed.Trying to reload
363        if (shareLibMap.isEmpty() && !shareLibLoadAttempted) {
364            synchronized (ShareLibService.class) {
365                if (shareLibMap.isEmpty()) {
366                    updateShareLib();
367                    shareLibLoadAttempted = true;
368                }
369            }
370        }
371        checkSymlink(shareLibKey);
372        return shareLibMap.get(shareLibKey);
373    }
374
375    private void checkSymlink(String shareLibKey) throws IOException {
376        if (!HadoopShims.isSymlinkSupported() || symlinkMapping.get(shareLibKey) == null
377                || symlinkMapping.get(shareLibKey).isEmpty()) {
378            return;
379        }
380
381        HadoopShims fileSystem = new HadoopShims(fs);
382        for (Path path : symlinkMapping.get(shareLibKey).keySet()) {
383            if (!symlinkMapping.get(shareLibKey).get(path).equals(fileSystem.getSymLinkTarget(path))) {
384                synchronized (ShareLibService.class) {
385                    Map<String, List<Path>> tmpShareLibMap = new HashMap<String, List<Path>>(shareLibMap);
386
387                    Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<String, Map<Path, Configuration>>(
388                            shareLibConfigMap);
389
390                    Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>(
391                            symlinkMapping);
392
393                    LOG.info(MessageFormat.format("Symlink target for [{0}] has changed, was [{1}], now [{2}]",
394                            shareLibKey, path, fileSystem.getSymLinkTarget(path)));
395                    loadShareLibMetaFile(tmpShareLibMap, tmpSymlinkMapping, tmpShareLibConfigMap, sharelibMappingFile,
396                            shareLibKey);
397                    shareLibMap = tmpShareLibMap;
398                    symlinkMapping = tmpSymlinkMapping;
399                    shareLibConfigMap = tmpShareLibConfigMap;
400                    return;
401                }
402
403            }
404        }
405
406    }
407
408    /**
409     * Gets the launcher jars.
410     *
411     * @param shareLibKey the shareLib key
412     * @return launcher jars paths
413     * @throws IOException Signals that an I/O exception has occurred.
414     */
415    public List<Path> getSystemLibJars(String shareLibKey) throws IOException {
416        List<Path> returnList = new ArrayList<Path>();
417        // Sharelib map is empty means that on previous or startup attempt of
418        // caching launcher jars has failed.Trying to reload
419        if (isShipLauncherEnabled) {
420            if (launcherLibMap.isEmpty()) {
421                synchronized (ShareLibService.class) {
422                    if (launcherLibMap.isEmpty()) {
423                        updateLauncherLib();
424                    }
425                }
426            }
427            if (launcherLibMap.get(shareLibKey) != null) {
428                returnList.addAll(launcherLibMap.get(shareLibKey));
429            }
430        }
431        if (shareLibKey.equals(JavaActionExecutor.OOZIE_COMMON_LIBDIR)) {
432            List<Path> sharelibList = getShareLibJars(shareLibKey);
433            if (sharelibList != null) {
434                returnList.addAll(sharelibList);
435            }
436        }
437        return returnList;
438    }
439
440    /**
441     * Find containing jar containing.
442     *
443     * @param clazz the clazz
444     * @return the string
445     */
446    @VisibleForTesting
447    protected String findContainingJar(Class clazz) {
448        ClassLoader loader = clazz.getClassLoader();
449        String classFile = clazz.getName().replaceAll("\\.", "/") + ".class";
450        try {
451            for (Enumeration itr = loader.getResources(classFile); itr.hasMoreElements();) {
452                URL url = (URL) itr.nextElement();
453                if ("jar".equals(url.getProtocol())) {
454                    String toReturn = url.getPath();
455                    if (toReturn.startsWith("file:")) {
456                        toReturn = toReturn.substring("file:".length());
457                        // URLDecoder is a misnamed class, since it actually
458                        // decodes
459                        // x-www-form-urlencoded MIME type rather than actual
460                        // URL encoding (which the file path has). Therefore it
461                        // would
462                        // decode +s to ' 's which is incorrect (spaces are
463                        // actually
464                        // either unencoded or encoded as "%20"). Replace +s
465                        // first, so
466                        // that they are kept sacred during the decoding
467                        // process.
468                        toReturn = toReturn.replaceAll("\\+", "%2B");
469                        toReturn = URLDecoder.decode(toReturn, "UTF-8");
470                        toReturn = toReturn.replaceAll("!.*$", "");
471                        return toReturn;
472                    }
473                }
474            }
475        }
476        catch (IOException ioe) {
477            throw new RuntimeException(ioe);
478        }
479        return null;
480    }
481
482    /**
483     * Purge libs.
484     *
485     * @param fs the fs
486     * @param prefix the prefix
487     * @param current the current time
488     * @throws IOException Signals that an I/O exception has occurred.
489     */
490    private void purgeLibs(FileSystem fs, final String prefix, final Date current) throws IOException {
491        Path executorLibBasePath = services.get(WorkflowAppService.class).getSystemLibPath();
492        PathFilter directoryFilter = new PathFilter() {
493            @Override
494            public boolean accept(Path path) {
495                if (path.getName().startsWith(prefix)) {
496                    String name = path.getName();
497                    String time = name.substring(prefix.length());
498                    Date d = null;
499                    try {
500                        d = dateFormat.parse(time);
501                    }
502                    catch (ParseException e) {
503                        return false;
504                    }
505                    return (current.getTime() - d.getTime()) > retentionTime;
506                }
507                else {
508                    return false;
509                }
510            }
511        };
512        FileStatus[] dirList = fs.listStatus(executorLibBasePath, directoryFilter);
513        Arrays.sort(dirList, new Comparator<FileStatus>() {
514            // sort in desc order
515            @Override
516            public int compare(FileStatus o1, FileStatus o2) {
517                return o2.getPath().getName().compareTo(o1.getPath().getName());
518            }
519        });
520
521        // Logic is to keep all share-lib between current timestamp and 7days old + 1 latest sharelib older than 7 days.
522        // refer OOZIE-1761
523        for (int i = 1; i < dirList.length; i++) {
524            Path dirPath = dirList[i].getPath();
525            fs.delete(dirPath, true);
526            LOG.info("Deleted old launcher jar lib directory {0}", dirPath.getName());
527        }
528    }
529
530    @Override
531    public void destroy() {
532        shareLibMap.clear();
533        launcherLibMap.clear();
534    }
535
536    @Override
537    public Class<? extends Service> getInterface() {
538        return ShareLibService.class;
539    }
540
541    /**
542     * Update share lib cache.
543     *
544     * @return the map
545     * @throws IOException Signals that an I/O exception has occurred.
546     */
547    public Map<String, String> updateShareLib() throws IOException {
548        Map<String, String> status = new HashMap<String, String>();
549
550        if (fs == null) {
551            Path launcherlibPath = getLauncherlibPath();
552            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
553            URI uri = launcherlibPath.toUri();
554            fs = FileSystem.get(has.createJobConf(uri.getAuthority()));
555        }
556
557        Map<String, List<Path>> tempShareLibMap = new HashMap<String, List<Path>>();
558        Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>();
559        Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<String, Map<Path, Configuration>>();
560
561        if (!StringUtils.isEmpty(sharelibMappingFile.trim())) {
562            String sharelibMetaFileNewTimeStamp = JsonUtils.formatDateRfc822(
563                    new Date(fs.getFileStatus(new Path(sharelibMappingFile)).getModificationTime()), "GMT");
564            loadShareLibMetaFile(tempShareLibMap, tmpSymlinkMapping, tmpShareLibConfigMap, sharelibMappingFile, null);
565            status.put("sharelibMetaFile", sharelibMappingFile);
566            status.put("sharelibMetaFileNewTimeStamp", sharelibMetaFileNewTimeStamp);
567            status.put("sharelibMetaFileOldTimeStamp", sharelibMetaFileOldTimeStamp);
568            sharelibMetaFileOldTimeStamp = sharelibMetaFileNewTimeStamp;
569        }
570        else {
571            Path shareLibpath = getLatestLibPath(services.get(WorkflowAppService.class).getSystemLibPath(),
572                    SHARE_LIB_PREFIX);
573            loadShareLibfromDFS(tempShareLibMap, shareLibpath, tmpShareLibConfigMap);
574
575            if (shareLibpath != null) {
576                status.put("sharelibDirNew", shareLibpath.toString());
577                status.put("sharelibDirOld", sharelibDirOld);
578                sharelibDirOld = shareLibpath.toString();
579            }
580
581        }
582        shareLibMap = tempShareLibMap;
583        symlinkMapping = tmpSymlinkMapping;
584        shareLibConfigMap = tmpShareLibConfigMap;
585        return status;
586    }
587
588    /**
589     * Update share lib cache. Parse the share lib directory and each sub directory is a action key
590     *
591     * @param shareLibMap the share lib jar map
592     * @param shareLibpath the share libpath
593     * @throws IOException Signals that an I/O exception has occurred.
594     */
595    private void loadShareLibfromDFS(Map<String, List<Path>> shareLibMap, Path shareLibpath,
596            Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException {
597
598        if (shareLibpath == null) {
599            LOG.info("No share lib directory found");
600            return;
601
602        }
603
604        FileStatus[] dirList = fs.listStatus(shareLibpath);
605
606        if (dirList == null) {
607            return;
608        }
609
610        for (FileStatus dir : dirList) {
611            if (!dir.isDir()) {
612                continue;
613            }
614            List<Path> listOfPaths = new ArrayList<Path>();
615            getPathRecursively(fs, dir.getPath(), listOfPaths, dir.getPath().getName(), shareLibConfigMap);
616            shareLibMap.put(dir.getPath().getName(), listOfPaths);
617            LOG.info("Share lib for " + dir.getPath().getName() + ":" + listOfPaths);
618
619        }
620
621    }
622
623    /**
624     * Load share lib text file. Sharelib mapping files contains list of key=value. where key is the action key and
625     * value is the DFS location of sharelib files.
626     *
627     * @param shareLibMap the share lib jar map
628     * @param symlinkMapping the symlink mapping
629     * @param sharelibFileMapping the sharelib file mapping
630     * @param shareLibKey the share lib key
631     * @throws IOException Signals that an I/O exception has occurred.
632     * @parm shareLibKey the sharelib key
633     */
634    private void loadShareLibMetaFile(Map<String, List<Path>> shareLibMap, Map<String, Map<Path, Path>> symlinkMapping,
635            Map<String, Map<Path, Configuration>> shareLibConfigMap, String sharelibFileMapping, String shareLibKey)
636            throws IOException {
637
638        Path shareFileMappingPath = new Path(sharelibFileMapping);
639        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
640        FileSystem filesystem = FileSystem.get(has.createJobConf(shareFileMappingPath.toUri().getAuthority()));
641        Properties prop = new Properties();
642        prop.load(filesystem.open(new Path(sharelibFileMapping)));
643
644        for (Object keyObject : prop.keySet()) {
645            String key = (String) keyObject;
646            String mapKey = key.substring(SHARE_LIB_CONF_PREFIX.length() + 1);
647            if (key.toLowerCase().startsWith(SHARE_LIB_CONF_PREFIX)
648                    && (shareLibKey == null || shareLibKey.equals(mapKey))) {
649                loadSharelib(shareLibMap, symlinkMapping, shareLibConfigMap, mapKey,
650                        ((String) prop.get(key)).split(","));
651            }
652        }
653    }
654
655    private void loadSharelib(Map<String, List<Path>> tmpShareLibMap, Map<String, Map<Path, Path>> tmpSymlinkMapping,
656            Map<String, Map<Path, Configuration>> shareLibConfigMap, String shareLibKey, String pathList[])
657            throws IOException {
658        List<Path> listOfPaths = new ArrayList<Path>();
659        Map<Path, Path> symlinkMappingforAction = new HashMap<Path, Path>();
660        HadoopShims fileSystem = new HadoopShims(fs);
661
662        for (String dfsPath : pathList) {
663            Path path = new Path(dfsPath);
664            getPathRecursively(fs, new Path(dfsPath), listOfPaths, shareLibKey, shareLibConfigMap);
665            if (HadoopShims.isSymlinkSupported() && fileSystem.isSymlink(path)) {
666                symlinkMappingforAction.put(fs.makeQualified(path), fileSystem.getSymLinkTarget(path));
667            }
668        }
669        if (HadoopShims.isSymlinkSupported()) {
670            LOG.info("symlink for " + shareLibKey + ":" + symlinkMappingforAction);
671            tmpSymlinkMapping.put(shareLibKey, symlinkMappingforAction);
672        }
673        tmpShareLibMap.put(shareLibKey, listOfPaths);
674        LOG.info("Share lib for " + shareLibKey + ":" + listOfPaths);
675    }
676
677    /**
678     * Gets the launcherlib path.
679     *
680     * @return the launcherlib path
681     */
682    private Path getLauncherlibPath() {
683        String formattedDate = dateFormat.format(Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime());
684        Path tmpLauncherLibPath = new Path(services.get(WorkflowAppService.class).getSystemLibPath(), LAUNCHER_LIB_PREFIX
685                + formattedDate);
686        return tmpLauncherLibPath;
687    }
688
689    /**
690     * Gets the Latest lib path.
691     *
692     * @param rootDir the root dir
693     * @param prefix the prefix
694     * @return latest lib path
695     * @throws IOException Signals that an I/O exception has occurred.
696     */
697    public Path getLatestLibPath(Path rootDir, final String prefix) throws IOException {
698        Date max = new Date(0L);
699        Path path = null;
700        PathFilter directoryFilter = new PathFilter() {
701            @Override
702            public boolean accept(Path path) {
703                return path.getName().startsWith(prefix);
704            }
705        };
706
707        FileStatus[] files = fs.listStatus(rootDir, directoryFilter);
708        for (FileStatus file : files) {
709            String name = file.getPath().getName().toString();
710            String time = name.substring(prefix.length());
711            Date d = null;
712            try {
713                d = dateFormat.parse(time);
714            }
715            catch (ParseException e) {
716                continue;
717            }
718            if (d.compareTo(max) > 0) {
719                path = file.getPath();
720                max = d;
721            }
722        }
723        // If there are no timestamped directories, fall back to root directory
724        if (path == null) {
725            path = rootDir;
726        }
727        return path;
728    }
729
730    /**
731     * Instruments the log service.
732     * <p>
733     * It sets instrumentation variables indicating the location of the sharelib and launcherlib
734     *
735     * @param instr instrumentation to use.
736     */
737    @Override
738    public void instrument(Instrumentation instr) {
739        instr.addVariable("libs", "sharelib.source", new Instrumentation.Variable<String>() {
740            @Override
741            public String getValue() {
742                if (!StringUtils.isEmpty(sharelibMappingFile.trim())) {
743                    return SHARELIB_MAPPING_FILE;
744                }
745                return WorkflowAppService.SYSTEM_LIB_PATH;
746            }
747        });
748        instr.addVariable("libs", "sharelib.mapping.file", new Instrumentation.Variable<String>() {
749            @Override
750            public String getValue() {
751                if (!StringUtils.isEmpty(sharelibMappingFile.trim())) {
752                    return sharelibMappingFile;
753                }
754                return "(none)";
755            }
756        });
757        instr.addVariable("libs", "sharelib.system.libpath", new Instrumentation.Variable<String>() {
758            @Override
759            public String getValue() {
760                String sharelibPath = "(unavailable)";
761                try {
762                    Path libPath = getLatestLibPath(services.get(WorkflowAppService.class).getSystemLibPath(),
763                            SHARE_LIB_PREFIX);
764                    if (libPath != null) {
765                        sharelibPath = libPath.toUri().toString();
766                    }
767                }
768                catch (IOException ioe) {
769                    // ignore exception because we're just doing instrumentation
770                }
771                return sharelibPath;
772            }
773        });
774        instr.addVariable("libs", "sharelib.mapping.file.timestamp", new Instrumentation.Variable<String>() {
775            @Override
776            public String getValue() {
777                if (!StringUtils.isEmpty(sharelibMetaFileOldTimeStamp)) {
778                    return sharelibMetaFileOldTimeStamp;
779                }
780                return "(none)";
781            }
782        });
783        instr.addVariable("libs", "sharelib.keys", new Instrumentation.Variable<String>() {
784            @Override
785            public String getValue() {
786                Map<String, List<Path>> shareLib = getShareLib();
787                if (shareLib != null && !shareLib.isEmpty()) {
788                    Set<String> keySet = shareLib.keySet();
789                    return keySet.toString();
790                }
791                return "(unavailable)";
792            }
793        });
794        instr.addVariable("libs", "launcherlib.system.libpath", new Instrumentation.Variable<String>() {
795            @Override
796            public String getValue() {
797                return getLauncherlibPath().toUri().toString();
798            }
799        });
800        instr.addVariable("libs", "sharelib.symlink.mapping", new Instrumentation.Variable<String>() {
801            @Override
802            public String getValue() {
803                Map<String, Map<Path, Path>> shareLibSymlinkMapping = getSymlinkMapping();
804                if (shareLibSymlinkMapping != null && !shareLibSymlinkMapping.isEmpty()
805                        && shareLibSymlinkMapping.values() != null && !shareLibSymlinkMapping.values().isEmpty()) {
806                    StringBuffer bf = new StringBuffer();
807                    for (Entry<String, Map<Path, Path>> entry : shareLibSymlinkMapping.entrySet()) {
808                        if (entry.getKey() != null && !entry.getValue().isEmpty()) {
809                            for (Path path : entry.getValue().keySet()) {
810                                bf.append(path).append("(").append(entry.getKey()).append(")").append("=>")
811                                        .append(shareLibSymlinkMapping.get(entry.getKey()) != null ? shareLibSymlinkMapping
812                                                .get(entry.getKey()).get(path) : "").append(",");
813                            }
814                        }
815                    }
816                    return bf.toString();
817                }
818                return "(none)";
819            }
820        });
821
822        instr.addVariable("libs", "sharelib.cached.config.file", new Instrumentation.Variable<String>() {
823            @Override
824            public String getValue() {
825                Map<String, Map<Path, Configuration>> shareLibConfigMap = getShareLibConfigMap();
826                if (shareLibConfigMap != null && !shareLibConfigMap.isEmpty()) {
827                    StringBuffer bf = new StringBuffer();
828
829                    for (String path : shareLibConfigMap.keySet()) {
830                        bf.append(path).append(";");
831                    }
832                    return bf.toString();
833                }
834                return "(none)";
835            }
836        });
837
838    }
839
840    /**
841     * Returns file system for shared libraries.
842     * <p>
843     * If WorkflowAppService#getSystemLibPath doesn't have authority then a default one assumed
844     *
845     * @return file system for shared libraries
846     */
847    public FileSystem getFileSystem() {
848        return fs;
849    }
850
851    /**
852     * Cache XML conf file
853     *
854     * @param hdfsPath the hdfs path
855     * @param shareLibKey the share lib key
856     * @throws IOException Signals that an I/O exception has occurred.
857     * @throws JDOMException
858     */
859    private void cachePropertyFile(Path qualifiedHdfsPath, Path hdfsPath, String shareLibKey,
860            Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException, JDOMException {
861        Map<Path, Configuration> confMap = shareLibConfigMap.get(shareLibKey);
862        if (confMap == null) {
863            confMap = new HashMap<Path, Configuration>();
864            shareLibConfigMap.put(shareLibKey, confMap);
865        }
866        Configuration xmlConf = new XConfiguration(fs.open(hdfsPath));
867        confMap.put(qualifiedHdfsPath, xmlConf);
868
869    }
870
871    private void cacheActionKeySharelibConfList() {
872        ActionService actionService = Services.get().get(ActionService.class);
873        Set<String> actionTypes = actionService.getActionTypes();
874        for (String key : actionTypes) {
875            ActionExecutor executor = actionService.getExecutor(key);
876            if (executor instanceof JavaActionExecutor) {
877                JavaActionExecutor jexecutor = (JavaActionExecutor) executor;
878                actionConfSet.addAll(
879                new HashSet<String>(Arrays.asList(jexecutor.getShareLibFilesForActionConf() == null ? new String[0]
880                        : jexecutor.getShareLibFilesForActionConf())));
881            }
882        }
883    }
884
885    public Configuration getShareLibConf(String inputKey, Path path) {
886        if (shareLibConfigMap.containsKey(inputKey)) {
887            return shareLibConfigMap.get(inputKey).get(path);
888        }
889
890        return null;
891    }
892
893    @VisibleForTesting
894    public Map<String, Map<Path, Configuration>> getShareLibConfigMap() {
895        return shareLibConfigMap;
896    }
897
898    private boolean isFilePartOfConfList(Path path) throws URISyntaxException {
899        String fragmentName = new URI(path.toString()).getFragment();
900        String fileName = fragmentName == null ? path.getName() : fragmentName;
901        return actionConfSet.contains(fileName);
902    }
903}