This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.io.File;
022import java.io.IOException;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.net.URL;
026import java.net.URLDecoder;
027import java.text.MessageFormat;
028import java.text.ParseException;
029import java.text.SimpleDateFormat;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.Calendar;
033import java.util.Comparator;
034import java.util.Date;
035import java.util.Enumeration;
036import java.util.HashMap;
037import java.util.HashSet;
038import java.util.List;
039import java.util.Map;
040import java.util.Properties;
041import java.util.Set;
042import java.util.TimeZone;
043import java.util.Map.Entry;
044import org.apache.commons.lang.StringUtils;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.fs.FileStatus;
047import org.apache.hadoop.fs.FileSystem;
048import org.apache.hadoop.fs.Path;
049import org.apache.hadoop.fs.PathFilter;
050import org.apache.hadoop.fs.permission.FsPermission;
051import org.apache.oozie.action.ActionExecutor;
052import org.apache.oozie.action.hadoop.JavaActionExecutor;
053import org.apache.oozie.client.rest.JsonUtils;
054import org.apache.oozie.hadoop.utils.HadoopShims;
055import org.apache.oozie.util.Instrumentable;
056import org.apache.oozie.util.Instrumentation;
057import org.apache.oozie.util.XConfiguration;
058import org.apache.oozie.util.XLog;
059import com.google.common.annotations.VisibleForTesting;
060
061import org.apache.oozie.ErrorCode;
062import org.jdom.JDOMException;
063
064public class ShareLibService implements Service, Instrumentable {
065
066    public static final String LAUNCHERJAR_LIB_RETENTION = CONF_PREFIX + "ShareLibService.temp.sharelib.retention.days";
067
068    public static final String SHARELIB_MAPPING_FILE = CONF_PREFIX + "ShareLibService.mapping.file";
069
070    public static final String SHIP_LAUNCHER_JAR = "oozie.action.ship.launcher.jar";
071
072    public static final String PURGE_INTERVAL = CONF_PREFIX + "ShareLibService.purge.interval";
073
074    public static final String FAIL_FAST_ON_STARTUP = CONF_PREFIX + "ShareLibService.fail.fast.on.startup";
075
076    private static final String PERMISSION_STRING = "-rwxr-xr-x";
077
078    public static final String LAUNCHER_LIB_PREFIX = "launcher_";
079
080    public static final String SHARE_LIB_PREFIX = "lib_";
081
082    public static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
083
084    private Services services;
085
086    private Map<String, List<Path>> shareLibMap = new HashMap<String, List<Path>>();
087
088    private Map<String, Map<Path, Configuration>> shareLibConfigMap = new HashMap<String, Map<Path, Configuration>>();
089
090    private Map<String, List<Path>> launcherLibMap = new HashMap<String, List<Path>>();
091
092    private Set<String> actionConfSet = new HashSet<String>();
093
094    // symlink mapping. Oozie keeps on checking symlink path and if changes, Oozie reloads the sharelib
095    private Map<String, Map<Path, Path>> symlinkMapping = new HashMap<String, Map<Path, Path>>();
096
097    private static XLog LOG = XLog.getLog(ShareLibService.class);
098
099    private String sharelibMappingFile;
100
101    private boolean isShipLauncherEnabled = false;
102
103    public static String SHARE_LIB_CONF_PREFIX = "oozie";
104
105    private boolean shareLibLoadAttempted = false;
106
107    private String sharelibMetaFileOldTimeStamp;
108
109    private String sharelibDirOld;
110
111    FileSystem fs;
112
113    final long retentionTime = 1000 * 60 * 60 * 24 * ConfigurationService.getInt(LAUNCHERJAR_LIB_RETENTION);
114
115    @Override
116    public void init(Services services) throws ServiceException {
117        this.services = services;
118        sharelibMappingFile = ConfigurationService.get(services.getConf(), SHARELIB_MAPPING_FILE);
119        isShipLauncherEnabled = ConfigurationService.getBoolean(services.getConf(), SHIP_LAUNCHER_JAR);
120        boolean failOnfailure = ConfigurationService.getBoolean(services.getConf(), FAIL_FAST_ON_STARTUP);
121        Path launcherlibPath = getLauncherlibPath();
122        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
123        URI uri = launcherlibPath.toUri();
124        try {
125            fs = FileSystem.get(has.createJobConf(uri.getAuthority()));
126            //cache action key sharelib conf list
127            cacheActionKeySharelibConfList();
128            updateLauncherLib();
129            updateShareLib();
130        }
131        catch (Throwable e) {
132            if (failOnfailure) {
133                LOG.error("Sharelib initialization fails", e);
134                throw new ServiceException(ErrorCode.E0104, getClass().getName(), "Sharelib initialization fails. ", e);
135            }
136            else {
137                // We don't want to actually fail init by throwing an Exception, so only create the ServiceException and
138                // log it
139                ServiceException se = new ServiceException(ErrorCode.E0104, getClass().getName(),
140                        "Not able to cache sharelib. An Admin needs to install the sharelib with oozie-setup.sh and issue the "
141                                + "'oozie admin' CLI command to update the sharelib", e);
142                LOG.error(se);
143            }
144        }
145        Runnable purgeLibsRunnable = new Runnable() {
146            @Override
147            public void run() {
148                System.out.flush();
149                try {
150                    // Only one server should purge sharelib
151                    if (Services.get().get(JobsConcurrencyService.class).isLeader()) {
152                        final Date current = Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime();
153                        purgeLibs(fs, LAUNCHER_LIB_PREFIX, current);
154                        purgeLibs(fs, SHARE_LIB_PREFIX, current);
155                    }
156                }
157                catch (IOException e) {
158                    LOG.error("There was an issue purging the sharelib", e);
159                }
160            }
161        };
162        services.get(SchedulerService.class).schedule(purgeLibsRunnable, 10,
163                ConfigurationService.getInt(services.getConf(), PURGE_INTERVAL) * 60 * 60 * 24,
164                SchedulerService.Unit.SEC);
165    }
166
167    /**
168     * Recursively change permissions.
169     *
170     * @throws IOException Signals that an I/O exception has occurred.
171     */
172    private void updateLauncherLib() throws IOException {
173        if (isShipLauncherEnabled) {
174            if (fs == null) {
175                Path launcherlibPath = getLauncherlibPath();
176                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
177                URI uri = launcherlibPath.toUri();
178                fs = FileSystem.get(has.createJobConf(uri.getAuthority()));
179            }
180            Path launcherlibPath = getLauncherlibPath();
181            setupLauncherLibPath(fs, launcherlibPath);
182            recursiveChangePermissions(fs, launcherlibPath, FsPermission.valueOf(PERMISSION_STRING));
183        }
184
185    }
186
187    /**
188     * Copy launcher jars to Temp directory.
189     *
190     * @param fs the FileSystem
191     * @param tmpLauncherLibPath the tmp launcher lib path
192     * @throws IOException Signals that an I/O exception has occurred.
193     */
194    private void setupLauncherLibPath(FileSystem fs, Path tmpLauncherLibPath) throws IOException {
195
196        ActionService actionService = Services.get().get(ActionService.class);
197        List<Class> classes = JavaActionExecutor.getCommonLauncherClasses();
198        Path baseDir = new Path(tmpLauncherLibPath, JavaActionExecutor.OOZIE_COMMON_LIBDIR);
199        copyJarContainingClasses(classes, fs, baseDir, JavaActionExecutor.OOZIE_COMMON_LIBDIR);
200        Set<String> actionTypes = actionService.getActionTypes();
201        for (String key : actionTypes) {
202            ActionExecutor executor = actionService.getExecutor(key);
203            if (executor instanceof JavaActionExecutor) {
204                JavaActionExecutor jexecutor = (JavaActionExecutor) executor;
205                classes = jexecutor.getLauncherClasses();
206                if (classes != null) {
207                    String type = executor.getType();
208                    Path executorDir = new Path(tmpLauncherLibPath, type);
209                    copyJarContainingClasses(classes, fs, executorDir, type);
210                }
211            }
212        }
213    }
214
215    /**
216     * Recursive change permissions.
217     *
218     * @param fs the FileSystem
219     * @param path the Path
220     * @param perm is permission
221     * @throws IOException Signals that an I/O exception has occurred.
222     */
223    private void recursiveChangePermissions(FileSystem fs, Path path, FsPermission fsPerm) throws IOException {
224        fs.setPermission(path, fsPerm);
225        FileStatus[] filesStatus = fs.listStatus(path);
226        for (int i = 0; i < filesStatus.length; i++) {
227            Path p = filesStatus[i].getPath();
228            if (filesStatus[i].isDir()) {
229                recursiveChangePermissions(fs, p, fsPerm);
230            }
231            else {
232                fs.setPermission(p, fsPerm);
233            }
234        }
235    }
236
237    /**
238     * Copy jar containing classes.
239     *
240     * @param classes the classes
241     * @param fs the FileSystem
242     * @param executorDir is Path
243     * @param type is sharelib key
244     * @throws IOException Signals that an I/O exception has occurred.
245     */
246    private void copyJarContainingClasses(List<Class> classes, FileSystem fs, Path executorDir, String type)
247            throws IOException {
248        fs.mkdirs(executorDir);
249        Set<String> localJarSet = new HashSet<String>();
250        for (Class c : classes) {
251            String localJar = findContainingJar(c);
252            if (localJar != null) {
253                localJarSet.add(localJar);
254            }
255            else {
256                throw new IOException("No jar containing " + c + " found");
257            }
258        }
259        List<Path> listOfPaths = new ArrayList<Path>();
260        for (String localJarStr : localJarSet) {
261            File localJar = new File(localJarStr);
262            fs.copyFromLocalFile(new Path(localJar.getPath()), executorDir);
263            Path path = new Path(executorDir, localJar.getName());
264            listOfPaths.add(path);
265            LOG.info(localJar.getName() + " uploaded to " + executorDir.toString());
266        }
267        launcherLibMap.put(type, listOfPaths);
268
269    }
270
271    /**
272     * Gets the path recursively.
273     *
274     * @param fs the FileSystem
275     * @param rootDir the root directory
276     * @param listOfPaths the list of paths
277     * @param shareLibKey the share lib key
278     * @return the path recursively
279     * @throws IOException Signals that an I/O exception has occurred.
280     */
281    private void getPathRecursively(FileSystem fs, Path rootDir, List<Path> listOfPaths, String shareLibKey,
282            Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException {
283        if (rootDir == null) {
284            return;
285        }
286
287        try {
288            if (fs.isFile(new Path(new URI(rootDir.toString()).getPath()))) {
289                Path filePath = new Path(new URI(rootDir.toString()).getPath());
290
291                if (isFilePartOfConfList(rootDir)) {
292                    cachePropertyFile(filePath, shareLibKey, shareLibConfigMap);
293                }
294
295                listOfPaths.add(rootDir);
296                return;
297            }
298
299            FileStatus[] status = fs.listStatus(rootDir);
300            if (status == null) {
301                LOG.info("Shared lib " + rootDir + " doesn't exist, not adding to cache");
302                return;
303            }
304
305            for (FileStatus file : status) {
306                if (file.isDir()) {
307                    getPathRecursively(fs, file.getPath(), listOfPaths, shareLibKey, shareLibConfigMap);
308                }
309                else {
310                    if (isFilePartOfConfList(file.getPath())) {
311                        cachePropertyFile(file.getPath(), shareLibKey, shareLibConfigMap);
312                    }
313                    listOfPaths.add(file.getPath());
314                }
315            }
316        }
317        catch (URISyntaxException e) {
318            throw new IOException(e);
319        }
320        catch (JDOMException e) {
321            throw new IOException(e);
322        }
323    }
324
325    public Map<String, List<Path>> getShareLib() {
326        return shareLibMap;
327    }
328
329    private Map<String, Map<Path, Path>> getSymlinkMapping() {
330        return symlinkMapping;
331    }
332
333    /**
334     * Gets the action sharelib lib jars.
335     *
336     * @param shareLibKey the sharelib key
337     * @return List of paths
338     * @throws IOException Signals that an I/O exception has occurred.
339     */
340    public List<Path> getShareLibJars(String shareLibKey) throws IOException {
341        // Sharelib map is empty means that on previous or startup attempt of
342        // caching sharelib has failed.Trying to reload
343        if (shareLibMap.isEmpty() && !shareLibLoadAttempted) {
344            synchronized (ShareLibService.class) {
345                if (shareLibMap.isEmpty()) {
346                    updateShareLib();
347                    shareLibLoadAttempted = true;
348                }
349            }
350        }
351        checkSymlink(shareLibKey);
352        return shareLibMap.get(shareLibKey);
353    }
354
355    private void checkSymlink(String shareLibKey) throws IOException {
356        if (!HadoopShims.isSymlinkSupported() || symlinkMapping.get(shareLibKey) == null
357                || symlinkMapping.get(shareLibKey).isEmpty()) {
358            return;
359        }
360
361        HadoopShims fileSystem = new HadoopShims(fs);
362        for (Path path : symlinkMapping.get(shareLibKey).keySet()) {
363            if (!symlinkMapping.get(shareLibKey).get(path).equals(fileSystem.getSymLinkTarget(path))) {
364                synchronized (ShareLibService.class) {
365                    Map<String, List<Path>> tmpShareLibMap = new HashMap<String, List<Path>>(shareLibMap);
366
367                    Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<String, Map<Path, Configuration>>(
368                            shareLibConfigMap);
369
370                    Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>(
371                            symlinkMapping);
372
373                    LOG.info(MessageFormat.format("Symlink target for [{0}] has changed, was [{1}], now [{2}]",
374                            shareLibKey, path, fileSystem.getSymLinkTarget(path)));
375                    loadShareLibMetaFile(tmpShareLibMap, tmpSymlinkMapping, tmpShareLibConfigMap, sharelibMappingFile,
376                            shareLibKey);
377                    shareLibMap = tmpShareLibMap;
378                    symlinkMapping = tmpSymlinkMapping;
379                    shareLibConfigMap = tmpShareLibConfigMap;
380                    return;
381                }
382
383            }
384        }
385
386    }
387
388    /**
389     * Gets the launcher jars.
390     *
391     * @param shareLibKey the shareLib key
392     * @return launcher jars paths
393     * @throws IOException Signals that an I/O exception has occurred.
394     */
395    public List<Path> getSystemLibJars(String shareLibKey) throws IOException {
396        List<Path> returnList = new ArrayList<Path>();
397        // Sharelib map is empty means that on previous or startup attempt of
398        // caching launcher jars has failed.Trying to reload
399        if (isShipLauncherEnabled) {
400            if (launcherLibMap.isEmpty()) {
401                synchronized (ShareLibService.class) {
402                    if (launcherLibMap.isEmpty()) {
403                        updateLauncherLib();
404                    }
405                }
406            }
407            if (launcherLibMap.get(shareLibKey) != null) {
408                returnList.addAll(launcherLibMap.get(shareLibKey));
409            }
410        }
411        if (shareLibKey.equals(JavaActionExecutor.OOZIE_COMMON_LIBDIR)) {
412            List<Path> sharelibList = getShareLibJars(shareLibKey);
413            if (sharelibList != null) {
414                returnList.addAll(sharelibList);
415            }
416        }
417        return returnList;
418    }
419
420    /**
421     * Find containing jar containing.
422     *
423     * @param clazz the clazz
424     * @return the string
425     */
426    @VisibleForTesting
427    protected String findContainingJar(Class clazz) {
428        ClassLoader loader = clazz.getClassLoader();
429        String classFile = clazz.getName().replaceAll("\\.", "/") + ".class";
430        try {
431            for (Enumeration itr = loader.getResources(classFile); itr.hasMoreElements();) {
432                URL url = (URL) itr.nextElement();
433                if ("jar".equals(url.getProtocol())) {
434                    String toReturn = url.getPath();
435                    if (toReturn.startsWith("file:")) {
436                        toReturn = toReturn.substring("file:".length());
437                        // URLDecoder is a misnamed class, since it actually
438                        // decodes
439                        // x-www-form-urlencoded MIME type rather than actual
440                        // URL encoding (which the file path has). Therefore it
441                        // would
442                        // decode +s to ' 's which is incorrect (spaces are
443                        // actually
444                        // either unencoded or encoded as "%20"). Replace +s
445                        // first, so
446                        // that they are kept sacred during the decoding
447                        // process.
448                        toReturn = toReturn.replaceAll("\\+", "%2B");
449                        toReturn = URLDecoder.decode(toReturn, "UTF-8");
450                        toReturn = toReturn.replaceAll("!.*$", "");
451                        return toReturn;
452                    }
453                }
454            }
455        }
456        catch (IOException ioe) {
457            throw new RuntimeException(ioe);
458        }
459        return null;
460    }
461
462    /**
463     * Purge libs.
464     *
465     * @param fs the fs
466     * @param prefix the prefix
467     * @param current the current time
468     * @throws IOException Signals that an I/O exception has occurred.
469     */
470    private void purgeLibs(FileSystem fs, final String prefix, final Date current) throws IOException {
471        Path executorLibBasePath = services.get(WorkflowAppService.class).getSystemLibPath();
472        PathFilter directoryFilter = new PathFilter() {
473            @Override
474            public boolean accept(Path path) {
475                if (path.getName().startsWith(prefix)) {
476                    String name = path.getName();
477                    String time = name.substring(prefix.length());
478                    Date d = null;
479                    try {
480                        d = dateFormat.parse(time);
481                    }
482                    catch (ParseException e) {
483                        return false;
484                    }
485                    return (current.getTime() - d.getTime()) > retentionTime;
486                }
487                else {
488                    return false;
489                }
490            }
491        };
492        FileStatus[] dirList = fs.listStatus(executorLibBasePath, directoryFilter);
493        Arrays.sort(dirList, new Comparator<FileStatus>() {
494            // sort in desc order
495            @Override
496            public int compare(FileStatus o1, FileStatus o2) {
497                return o2.getPath().getName().compareTo(o1.getPath().getName());
498            }
499        });
500
501        // Logic is to keep all share-lib between current timestamp and 7days old + 1 latest sharelib older than 7 days.
502        // refer OOZIE-1761
503        for (int i = 1; i < dirList.length; i++) {
504            Path dirPath = dirList[i].getPath();
505            fs.delete(dirPath, true);
506            LOG.info("Deleted old launcher jar lib directory {0}", dirPath.getName());
507        }
508    }
509
510    @Override
511    public void destroy() {
512        shareLibMap.clear();
513        launcherLibMap.clear();
514    }
515
516    @Override
517    public Class<? extends Service> getInterface() {
518        return ShareLibService.class;
519    }
520
521    /**
522     * Update share lib cache.
523     *
524     * @return the map
525     * @throws IOException Signals that an I/O exception has occurred.
526     */
527    public Map<String, String> updateShareLib() throws IOException {
528        Map<String, String> status = new HashMap<String, String>();
529
530        if (fs == null) {
531            Path launcherlibPath = getLauncherlibPath();
532            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
533            URI uri = launcherlibPath.toUri();
534            fs = FileSystem.get(has.createJobConf(uri.getAuthority()));
535        }
536
537        Map<String, List<Path>> tempShareLibMap = new HashMap<String, List<Path>>();
538        Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>();
539        Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<String, Map<Path, Configuration>>();
540
541        if (!StringUtils.isEmpty(sharelibMappingFile.trim())) {
542            String sharelibMetaFileNewTimeStamp = JsonUtils.formatDateRfc822(
543                    new Date(fs.getFileStatus(new Path(sharelibMappingFile)).getModificationTime()), "GMT");
544            loadShareLibMetaFile(tempShareLibMap, tmpSymlinkMapping, tmpShareLibConfigMap, sharelibMappingFile, null);
545            status.put("sharelibMetaFile", sharelibMappingFile);
546            status.put("sharelibMetaFileNewTimeStamp", sharelibMetaFileNewTimeStamp);
547            status.put("sharelibMetaFileOldTimeStamp", sharelibMetaFileOldTimeStamp);
548            sharelibMetaFileOldTimeStamp = sharelibMetaFileNewTimeStamp;
549        }
550        else {
551            Path shareLibpath = getLatestLibPath(services.get(WorkflowAppService.class).getSystemLibPath(),
552                    SHARE_LIB_PREFIX);
553            loadShareLibfromDFS(tempShareLibMap, shareLibpath, tmpShareLibConfigMap);
554
555            if (shareLibpath != null) {
556                status.put("sharelibDirNew", shareLibpath.toString());
557                status.put("sharelibDirOld", sharelibDirOld);
558                sharelibDirOld = shareLibpath.toString();
559            }
560
561        }
562        shareLibMap = tempShareLibMap;
563        symlinkMapping = tmpSymlinkMapping;
564        shareLibConfigMap = tmpShareLibConfigMap;
565        return status;
566    }
567
568    /**
569     * Update share lib cache. Parse the share lib directory and each sub directory is a action key
570     *
571     * @param shareLibMap the share lib jar map
572     * @param shareLibpath the share libpath
573     * @throws IOException Signals that an I/O exception has occurred.
574     */
575    private void loadShareLibfromDFS(Map<String, List<Path>> shareLibMap, Path shareLibpath,
576            Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException {
577
578        if (shareLibpath == null) {
579            LOG.info("No share lib directory found");
580            return;
581
582        }
583
584        FileStatus[] dirList = fs.listStatus(shareLibpath);
585
586        if (dirList == null) {
587            return;
588        }
589
590        for (FileStatus dir : dirList) {
591            if (!dir.isDir()) {
592                continue;
593            }
594            List<Path> listOfPaths = new ArrayList<Path>();
595            getPathRecursively(fs, dir.getPath(), listOfPaths, dir.getPath().getName(), shareLibConfigMap);
596            shareLibMap.put(dir.getPath().getName(), listOfPaths);
597            LOG.info("Share lib for " + dir.getPath().getName() + ":" + listOfPaths);
598
599        }
600
601    }
602
603    /**
604     * Load share lib text file. Sharelib mapping files contains list of key=value. where key is the action key and
605     * value is the DFS location of sharelib files.
606     *
607     * @param shareLibMap the share lib jar map
608     * @param symlinkMapping the symlink mapping
609     * @param sharelibFileMapping the sharelib file mapping
610     * @param shareLibKey the share lib key
611     * @throws IOException Signals that an I/O exception has occurred.
612     * @parm shareLibKey the sharelib key
613     */
614    private void loadShareLibMetaFile(Map<String, List<Path>> shareLibMap, Map<String, Map<Path, Path>> symlinkMapping,
615            Map<String, Map<Path, Configuration>> shareLibConfigMap, String sharelibFileMapping, String shareLibKey)
616            throws IOException {
617
618        Path shareFileMappingPath = new Path(sharelibFileMapping);
619        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
620        FileSystem filesystem = FileSystem.get(has.createJobConf(shareFileMappingPath.toUri().getAuthority()));
621        Properties prop = new Properties();
622        prop.load(filesystem.open(new Path(sharelibFileMapping)));
623
624        for (Object keyObject : prop.keySet()) {
625            String key = (String) keyObject;
626            String mapKey = key.substring(SHARE_LIB_CONF_PREFIX.length() + 1);
627            if (key.toLowerCase().startsWith(SHARE_LIB_CONF_PREFIX)
628                    && (shareLibKey == null || shareLibKey.equals(mapKey))) {
629                loadSharelib(shareLibMap, symlinkMapping, shareLibConfigMap, mapKey,
630                        ((String) prop.get(key)).split(","));
631            }
632        }
633    }
634
635    private void loadSharelib(Map<String, List<Path>> tmpShareLibMap, Map<String, Map<Path, Path>> tmpSymlinkMapping,
636            Map<String, Map<Path, Configuration>> shareLibConfigMap, String shareLibKey, String pathList[])
637            throws IOException {
638        List<Path> listOfPaths = new ArrayList<Path>();
639        Map<Path, Path> symlinkMappingforAction = new HashMap<Path, Path>();
640        HadoopShims fileSystem = new HadoopShims(fs);
641
642        for (String dfsPath : pathList) {
643            Path path = new Path(dfsPath);
644            getPathRecursively(fs, new Path(dfsPath), listOfPaths, shareLibKey, shareLibConfigMap);
645            if (HadoopShims.isSymlinkSupported() && fileSystem.isSymlink(path)) {
646                symlinkMappingforAction.put(path, fileSystem.getSymLinkTarget(path));
647            }
648        }
649        if (HadoopShims.isSymlinkSupported()) {
650            LOG.info("symlink for " + shareLibKey + ":" + symlinkMappingforAction);
651            tmpSymlinkMapping.put(shareLibKey, symlinkMappingforAction);
652        }
653        tmpShareLibMap.put(shareLibKey, listOfPaths);
654        LOG.info("Share lib for " + shareLibKey + ":" + listOfPaths);
655    }
656
657    /**
658     * Gets the launcherlib path.
659     *
660     * @return the launcherlib path
661     */
662    private Path getLauncherlibPath() {
663        String formattedDate = dateFormat.format(Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime());
664        Path tmpLauncherLibPath = new Path(services.get(WorkflowAppService.class).getSystemLibPath(), LAUNCHER_LIB_PREFIX
665                + formattedDate);
666        return tmpLauncherLibPath;
667    }
668
669    /**
670     * Gets the Latest lib path.
671     *
672     * @param rootDir the root dir
673     * @param prefix the prefix
674     * @return latest lib path
675     * @throws IOException Signals that an I/O exception has occurred.
676     */
677    public Path getLatestLibPath(Path rootDir, final String prefix) throws IOException {
678        Date max = new Date(0L);
679        Path path = null;
680        PathFilter directoryFilter = new PathFilter() {
681            @Override
682            public boolean accept(Path path) {
683                return path.getName().startsWith(prefix);
684            }
685        };
686
687        FileStatus[] files = fs.listStatus(rootDir, directoryFilter);
688        for (FileStatus file : files) {
689            String name = file.getPath().getName().toString();
690            String time = name.substring(prefix.length());
691            Date d = null;
692            try {
693                d = dateFormat.parse(time);
694            }
695            catch (ParseException e) {
696                continue;
697            }
698            if (d.compareTo(max) > 0) {
699                path = file.getPath();
700                max = d;
701            }
702        }
703        // If there are no timestamped directories, fall back to root directory
704        if (path == null) {
705            path = rootDir;
706        }
707        return path;
708    }
709
710    /**
711     * Instruments the log service.
712     * <p/>
713     * It sets instrumentation variables indicating the location of the sharelib and launcherlib
714     *
715     * @param instr instrumentation to use.
716     */
717    @Override
718    public void instrument(Instrumentation instr) {
719        instr.addVariable("libs", "sharelib.source", new Instrumentation.Variable<String>() {
720            @Override
721            public String getValue() {
722                if (!StringUtils.isEmpty(sharelibMappingFile.trim())) {
723                    return SHARELIB_MAPPING_FILE;
724                }
725                return WorkflowAppService.SYSTEM_LIB_PATH;
726            }
727        });
728        instr.addVariable("libs", "sharelib.mapping.file", new Instrumentation.Variable<String>() {
729            @Override
730            public String getValue() {
731                if (!StringUtils.isEmpty(sharelibMappingFile.trim())) {
732                    return sharelibMappingFile;
733                }
734                return "(none)";
735            }
736        });
737        instr.addVariable("libs", "sharelib.system.libpath", new Instrumentation.Variable<String>() {
738            @Override
739            public String getValue() {
740                String sharelibPath = "(unavailable)";
741                try {
742                    Path libPath = getLatestLibPath(services.get(WorkflowAppService.class).getSystemLibPath(),
743                            SHARE_LIB_PREFIX);
744                    if (libPath != null) {
745                        sharelibPath = libPath.toUri().toString();
746                    }
747                }
748                catch (IOException ioe) {
749                    // ignore exception because we're just doing instrumentation
750                }
751                return sharelibPath;
752            }
753        });
754        instr.addVariable("libs", "sharelib.mapping.file.timestamp", new Instrumentation.Variable<String>() {
755            @Override
756            public String getValue() {
757                if (!StringUtils.isEmpty(sharelibMetaFileOldTimeStamp)) {
758                    return sharelibMetaFileOldTimeStamp;
759                }
760                return "(none)";
761            }
762        });
763        instr.addVariable("libs", "sharelib.keys", new Instrumentation.Variable<String>() {
764            @Override
765            public String getValue() {
766                Map<String, List<Path>> shareLib = getShareLib();
767                if (shareLib != null && !shareLib.isEmpty()) {
768                    Set<String> keySet = shareLib.keySet();
769                    return keySet.toString();
770                }
771                return "(unavailable)";
772            }
773        });
774        instr.addVariable("libs", "launcherlib.system.libpath", new Instrumentation.Variable<String>() {
775            @Override
776            public String getValue() {
777                return getLauncherlibPath().toUri().toString();
778            }
779        });
780        instr.addVariable("libs", "sharelib.symlink.mapping", new Instrumentation.Variable<String>() {
781            @Override
782            public String getValue() {
783                Map<String, Map<Path, Path>> shareLibSymlinkMapping = getSymlinkMapping();
784                if (shareLibSymlinkMapping != null && !shareLibSymlinkMapping.isEmpty()
785                        && shareLibSymlinkMapping.values() != null && !shareLibSymlinkMapping.values().isEmpty()) {
786                    StringBuffer bf = new StringBuffer();
787                    for (Entry<String, Map<Path, Path>> entry : shareLibSymlinkMapping.entrySet())
788                        if (entry.getKey() != null && !entry.getValue().isEmpty()) {
789                            for (Path path : entry.getValue().keySet()) {
790                                bf.append(path).append("(").append(entry.getKey()).append(")").append("=>")
791                                        .append(shareLibSymlinkMapping.get(entry.getKey()) != null ? shareLibSymlinkMapping
792                                                .get(entry.getKey()).get(path) : "").append(",");
793                            }
794                        }
795                    return bf.toString();
796                }
797                return "(none)";
798            }
799        });
800
801        instr.addVariable("libs", "sharelib.cached.config.file", new Instrumentation.Variable<String>() {
802            @Override
803            public String getValue() {
804                Map<String, Map<Path, Configuration>> shareLibConfigMap = getShareLibConfigMap();
805                if (shareLibConfigMap != null && !shareLibConfigMap.isEmpty()) {
806                    StringBuffer bf = new StringBuffer();
807
808                    for (String path : shareLibConfigMap.keySet()) {
809                        bf.append(path).append(";");
810                    }
811                    return bf.toString();
812                }
813                return "(none)";
814            }
815        });
816
817    }
818
819    /**
820     * Returns file system for shared libraries.
821     * <p/>
822     * If WorkflowAppService#getSystemLibPath doesn't have authority then a default one assumed
823     *
824     * @return file system for shared libraries
825     */
826    public FileSystem getFileSystem() {
827        return fs;
828    }
829
830    /**
831     * Cache XML conf file
832     *
833     * @param hdfsPath the hdfs path
834     * @param shareLibKey the share lib key
835     * @throws IOException Signals that an I/O exception has occurred.
836     * @throws JDOMException
837     */
838    private void cachePropertyFile(Path hdfsPath, String shareLibKey,
839            Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException, JDOMException {
840        Map<Path, Configuration> confMap = shareLibConfigMap.get(shareLibKey);
841        if (confMap == null) {
842            confMap = new HashMap<Path, Configuration>();
843            shareLibConfigMap.put(shareLibKey, confMap);
844        }
845        Configuration xmlConf = new XConfiguration(fs.open(hdfsPath));
846        confMap.put(hdfsPath, xmlConf);
847
848    }
849
850    private void cacheActionKeySharelibConfList() {
851        ActionService actionService = Services.get().get(ActionService.class);
852        Set<String> actionTypes = actionService.getActionTypes();
853        for (String key : actionTypes) {
854            ActionExecutor executor = actionService.getExecutor(key);
855            if (executor instanceof JavaActionExecutor) {
856                JavaActionExecutor jexecutor = (JavaActionExecutor) executor;
857                actionConfSet.addAll(
858                new HashSet<String>(Arrays.asList(jexecutor.getShareLibFilesForActionConf() == null ? new String[0]
859                        : jexecutor.getShareLibFilesForActionConf())));
860            }
861        }
862    }
863
864    public Configuration getShareLibConf(String inputKey, Path path) {
865        Configuration conf = new Configuration();
866        if (shareLibConfigMap.containsKey(inputKey)) {
867            conf = shareLibConfigMap.get(inputKey).get(path);
868        }
869
870        return conf;
871    }
872
873    @VisibleForTesting
874    public Map<String, Map<Path, Configuration>> getShareLibConfigMap() {
875        return shareLibConfigMap;
876    }
877
878    private boolean isFilePartOfConfList(Path path) throws URISyntaxException {
879        String fragmentName = new URI(path.toString()).getFragment();
880        String fileName = fragmentName == null ? path.getName() : fragmentName;
881        return actionConfSet.contains(fileName);
882    }
883}