001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.io.File;
022import java.io.IOException;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.net.URL;
026import java.net.URLDecoder;
027import java.text.MessageFormat;
028import java.text.ParseException;
029import java.text.SimpleDateFormat;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.Calendar;
033import java.util.Comparator;
034import java.util.Date;
035import java.util.Enumeration;
036import java.util.HashMap;
037import java.util.HashSet;
038import java.util.List;
039import java.util.Map;
040import java.util.Properties;
041import java.util.Set;
042import java.util.TimeZone;
043import java.util.Map.Entry;
044import org.apache.commons.lang.StringUtils;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.fs.FileStatus;
047import org.apache.hadoop.fs.FileSystem;
048import org.apache.hadoop.fs.Path;
049import org.apache.hadoop.fs.PathFilter;
050import org.apache.hadoop.fs.permission.FsPermission;
051import org.apache.oozie.action.ActionExecutor;
052import org.apache.oozie.action.hadoop.JavaActionExecutor;
053import org.apache.oozie.client.rest.JsonUtils;
054import org.apache.oozie.hadoop.utils.HadoopShims;
055import org.apache.oozie.util.Instrumentable;
056import org.apache.oozie.util.Instrumentation;
057import org.apache.oozie.util.XConfiguration;
058import org.apache.oozie.util.XLog;
059import com.google.common.annotations.VisibleForTesting;
060
061import org.apache.oozie.ErrorCode;
062import org.jdom.JDOMException;
063
064public class ShareLibService implements Service, Instrumentable {
065
066    public static final String LAUNCHERJAR_LIB_RETENTION = CONF_PREFIX + "ShareLibService.temp.sharelib.retention.days";
067
068    public static final String SHARELIB_MAPPING_FILE = CONF_PREFIX + "ShareLibService.mapping.file";
069
070    public static final String SHIP_LAUNCHER_JAR = "oozie.action.ship.launcher.jar";
071
072    public static final String PURGE_INTERVAL = CONF_PREFIX + "ShareLibService.purge.interval";
073
074    public static final String FAIL_FAST_ON_STARTUP = CONF_PREFIX + "ShareLibService.fail.fast.on.startup";
075
076    private static final String PERMISSION_STRING = "-rwxr-xr-x";
077
078    public static final String LAUNCHER_LIB_PREFIX = "launcher_";
079
080    public static final String SHARE_LIB_PREFIX = "lib_";
081
082    public static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
083
084    private Services services;
085
086    private Map<String, List<Path>> shareLibMap = new HashMap<String, List<Path>>();
087
088    private Map<String, Map<Path, Configuration>> shareLibConfigMap = new HashMap<String, Map<Path, Configuration>>();
089
090    private Map<String, List<Path>> launcherLibMap = new HashMap<String, List<Path>>();
091
092    private Set<String> actionConfSet = new HashSet<String>();
093
094    // symlink mapping. Oozie keeps on checking symlink path and if changes, Oozie reloads the sharelib
095    private Map<String, Map<Path, Path>> symlinkMapping = new HashMap<String, Map<Path, Path>>();
096
097    private static XLog LOG = XLog.getLog(ShareLibService.class);
098
099    private String sharelibMappingFile;
100
101    private boolean isShipLauncherEnabled = false;
102
103    public static String SHARE_LIB_CONF_PREFIX = "oozie";
104
105    private boolean shareLibLoadAttempted = false;
106
107    private String sharelibMetaFileOldTimeStamp;
108
109    private String sharelibDirOld;
110
111    FileSystem fs;
112
113    final long retentionTime = 1000 * 60 * 60 * 24 * ConfigurationService.getInt(LAUNCHERJAR_LIB_RETENTION);
114
115    @Override
116    public void init(Services services) throws ServiceException {
117        this.services = services;
118        sharelibMappingFile = ConfigurationService.get(services.getConf(), SHARELIB_MAPPING_FILE);
119        isShipLauncherEnabled = ConfigurationService.getBoolean(services.getConf(), SHIP_LAUNCHER_JAR);
120        boolean failOnfailure = ConfigurationService.getBoolean(services.getConf(), FAIL_FAST_ON_STARTUP);
121        Path launcherlibPath = getLauncherlibPath();
122        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
123        URI uri = launcherlibPath.toUri();
124        try {
125            fs = FileSystem.get(has.createJobConf(uri.getAuthority()));
126            //cache action key sharelib conf list
127            cacheActionKeySharelibConfList();
128            updateLauncherLib();
129            updateShareLib();
130        }
131        catch (Throwable e) {
132            if (failOnfailure) {
133                LOG.error("Sharelib initialization fails", e);
134                throw new ServiceException(ErrorCode.E0104, getClass().getName(), "Sharelib initialization fails. ", e);
135            }
136            else {
137                // We don't want to actually fail init by throwing an Exception, so only create the ServiceException and
138                // log it
139                ServiceException se = new ServiceException(ErrorCode.E0104, getClass().getName(),
140                        "Not able to cache sharelib. An Admin needs to install the sharelib with oozie-setup.sh and issue the "
141                                + "'oozie admin' CLI command to update the sharelib", e);
142                LOG.error(se);
143            }
144        }
145        Runnable purgeLibsRunnable = new Runnable() {
146            @Override
147            public void run() {
148                System.out.flush();
149                try {
150                    // Only one server should purge sharelib
151                    if (Services.get().get(JobsConcurrencyService.class).isLeader()) {
152                        final Date current = Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime();
153                        purgeLibs(fs, LAUNCHER_LIB_PREFIX, current);
154                        purgeLibs(fs, SHARE_LIB_PREFIX, current);
155                    }
156                }
157                catch (IOException e) {
158                    LOG.error("There was an issue purging the sharelib", e);
159                }
160            }
161        };
162        services.get(SchedulerService.class).schedule(purgeLibsRunnable, 10,
163                ConfigurationService.getInt(services.getConf(), PURGE_INTERVAL) * 60 * 60 * 24,
164                SchedulerService.Unit.SEC);
165    }
166
167    /**
168     * Recursively change permissions.
169     *
170     * @throws IOException Signals that an I/O exception has occurred.
171     */
172    private void updateLauncherLib() throws IOException {
173        if (isShipLauncherEnabled) {
174            if (fs == null) {
175                Path launcherlibPath = getLauncherlibPath();
176                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
177                URI uri = launcherlibPath.toUri();
178                fs = FileSystem.get(has.createJobConf(uri.getAuthority()));
179            }
180            Path launcherlibPath = getLauncherlibPath();
181            setupLauncherLibPath(fs, launcherlibPath);
182            recursiveChangePermissions(fs, launcherlibPath, FsPermission.valueOf(PERMISSION_STRING));
183        }
184
185    }
186
187    /**
188     * Copy launcher jars to Temp directory.
189     *
190     * @param fs the FileSystem
191     * @param tmpLauncherLibPath the tmp launcher lib path
192     * @throws IOException Signals that an I/O exception has occurred.
193     */
194    private void setupLauncherLibPath(FileSystem fs, Path tmpLauncherLibPath) throws IOException {
195
196        ActionService actionService = Services.get().get(ActionService.class);
197        List<Class> classes = JavaActionExecutor.getCommonLauncherClasses();
198        Path baseDir = new Path(tmpLauncherLibPath, JavaActionExecutor.OOZIE_COMMON_LIBDIR);
199        copyJarContainingClasses(classes, fs, baseDir, JavaActionExecutor.OOZIE_COMMON_LIBDIR);
200        Set<String> actionTypes = actionService.getActionTypes();
201        for (String key : actionTypes) {
202            ActionExecutor executor = actionService.getExecutor(key);
203            if (executor instanceof JavaActionExecutor) {
204                JavaActionExecutor jexecutor = (JavaActionExecutor) executor;
205                classes = jexecutor.getLauncherClasses();
206                if (classes != null) {
207                    String type = executor.getType();
208                    Path executorDir = new Path(tmpLauncherLibPath, type);
209                    copyJarContainingClasses(classes, fs, executorDir, type);
210                }
211            }
212        }
213    }
214
215    /**
216     * Recursive change permissions.
217     *
218     * @param fs the FileSystem
219     * @param path the Path
220     * @param perm is permission
221     * @throws IOException Signals that an I/O exception has occurred.
222     */
223    private void recursiveChangePermissions(FileSystem fs, Path path, FsPermission fsPerm) throws IOException {
224        fs.setPermission(path, fsPerm);
225        FileStatus[] filesStatus = fs.listStatus(path);
226        for (int i = 0; i < filesStatus.length; i++) {
227            Path p = filesStatus[i].getPath();
228            if (filesStatus[i].isDir()) {
229                recursiveChangePermissions(fs, p, fsPerm);
230            }
231            else {
232                fs.setPermission(p, fsPerm);
233            }
234        }
235    }
236
237    /**
238     * Copy jar containing classes.
239     *
240     * @param classes the classes
241     * @param fs the FileSystem
242     * @param executorDir is Path
243     * @param type is sharelib key
244     * @throws IOException Signals that an I/O exception has occurred.
245     */
246    private void copyJarContainingClasses(List<Class> classes, FileSystem fs, Path executorDir, String type)
247            throws IOException {
248        fs.mkdirs(executorDir);
249        Set<String> localJarSet = new HashSet<String>();
250        for (Class c : classes) {
251            String localJar = findContainingJar(c);
252            if (localJar != null) {
253                localJarSet.add(localJar);
254            }
255            else {
256                throw new IOException("No jar containing " + c + " found");
257            }
258        }
259        List<Path> listOfPaths = new ArrayList<Path>();
260        for (String localJarStr : localJarSet) {
261            File localJar = new File(localJarStr);
262            fs.copyFromLocalFile(new Path(localJar.getPath()), executorDir);
263            Path path = new Path(executorDir, localJar.getName());
264            listOfPaths.add(path);
265            LOG.info(localJar.getName() + " uploaded to " + executorDir.toString());
266        }
267        launcherLibMap.put(type, listOfPaths);
268
269    }
270
271    /**
272     * Gets the path recursively.
273     *
274     * @param fs the FileSystem
275     * @param rootDir the root directory
276     * @param listOfPaths the list of paths
277     * @param shareLibKey the share lib key
278     * @return the path recursively
279     * @throws IOException Signals that an I/O exception has occurred.
280     */
281    private void getPathRecursively(FileSystem fs, Path rootDir, List<Path> listOfPaths, String shareLibKey,
282            Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException {
283        if (rootDir == null) {
284            return;
285        }
286
287        try {
288            if (fs.isFile(new Path(new URI(rootDir.toString()).getPath()))) {
289                Path filePath = new Path(new URI(rootDir.toString()).getPath());
290                Path qualifiedRootDirPath = fs.makeQualified(rootDir);
291                if (isFilePartOfConfList(rootDir)) {
292                    cachePropertyFile(qualifiedRootDirPath, filePath, shareLibKey, shareLibConfigMap);
293                }
294                listOfPaths.add(qualifiedRootDirPath);
295                return;
296            }
297
298            FileStatus[] status = fs.listStatus(rootDir);
299            if (status == null) {
300                LOG.info("Shared lib " + rootDir + " doesn't exist, not adding to cache");
301                return;
302            }
303
304            for (FileStatus file : status) {
305                if (file.isDir()) {
306                    getPathRecursively(fs, file.getPath(), listOfPaths, shareLibKey, shareLibConfigMap);
307                }
308                else {
309                    if (isFilePartOfConfList(file.getPath())) {
310                        cachePropertyFile(file.getPath(), file.getPath(), shareLibKey, shareLibConfigMap);
311                    }
312                    listOfPaths.add(file.getPath());
313                }
314            }
315        }
316        catch (URISyntaxException e) {
317            throw new IOException(e);
318        }
319        catch (JDOMException e) {
320            throw new IOException(e);
321        }
322    }
323
324    public Map<String, List<Path>> getShareLib() {
325        return shareLibMap;
326    }
327
328    private Map<String, Map<Path, Path>> getSymlinkMapping() {
329        return symlinkMapping;
330    }
331
332    /**
333     * Gets the action sharelib lib jars.
334     *
335     * @param shareLibKey the sharelib key
336     * @return List of paths
337     * @throws IOException Signals that an I/O exception has occurred.
338     */
339    public List<Path> getShareLibJars(String shareLibKey) throws IOException {
340        // Sharelib map is empty means that on previous or startup attempt of
341        // caching sharelib has failed.Trying to reload
342        if (shareLibMap.isEmpty() && !shareLibLoadAttempted) {
343            synchronized (ShareLibService.class) {
344                if (shareLibMap.isEmpty()) {
345                    updateShareLib();
346                    shareLibLoadAttempted = true;
347                }
348            }
349        }
350        checkSymlink(shareLibKey);
351        return shareLibMap.get(shareLibKey);
352    }
353
354    private void checkSymlink(String shareLibKey) throws IOException {
355        if (!HadoopShims.isSymlinkSupported() || symlinkMapping.get(shareLibKey) == null
356                || symlinkMapping.get(shareLibKey).isEmpty()) {
357            return;
358        }
359
360        HadoopShims fileSystem = new HadoopShims(fs);
361        for (Path path : symlinkMapping.get(shareLibKey).keySet()) {
362            if (!symlinkMapping.get(shareLibKey).get(path).equals(fileSystem.getSymLinkTarget(path))) {
363                synchronized (ShareLibService.class) {
364                    Map<String, List<Path>> tmpShareLibMap = new HashMap<String, List<Path>>(shareLibMap);
365
366                    Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<String, Map<Path, Configuration>>(
367                            shareLibConfigMap);
368
369                    Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>(
370                            symlinkMapping);
371
372                    LOG.info(MessageFormat.format("Symlink target for [{0}] has changed, was [{1}], now [{2}]",
373                            shareLibKey, path, fileSystem.getSymLinkTarget(path)));
374                    loadShareLibMetaFile(tmpShareLibMap, tmpSymlinkMapping, tmpShareLibConfigMap, sharelibMappingFile,
375                            shareLibKey);
376                    shareLibMap = tmpShareLibMap;
377                    symlinkMapping = tmpSymlinkMapping;
378                    shareLibConfigMap = tmpShareLibConfigMap;
379                    return;
380                }
381
382            }
383        }
384
385    }
386
387    /**
388     * Gets the launcher jars.
389     *
390     * @param shareLibKey the shareLib key
391     * @return launcher jars paths
392     * @throws IOException Signals that an I/O exception has occurred.
393     */
394    public List<Path> getSystemLibJars(String shareLibKey) throws IOException {
395        List<Path> returnList = new ArrayList<Path>();
396        // Sharelib map is empty means that on previous or startup attempt of
397        // caching launcher jars has failed.Trying to reload
398        if (isShipLauncherEnabled) {
399            if (launcherLibMap.isEmpty()) {
400                synchronized (ShareLibService.class) {
401                    if (launcherLibMap.isEmpty()) {
402                        updateLauncherLib();
403                    }
404                }
405            }
406            if (launcherLibMap.get(shareLibKey) != null) {
407                returnList.addAll(launcherLibMap.get(shareLibKey));
408            }
409        }
410        if (shareLibKey.equals(JavaActionExecutor.OOZIE_COMMON_LIBDIR)) {
411            List<Path> sharelibList = getShareLibJars(shareLibKey);
412            if (sharelibList != null) {
413                returnList.addAll(sharelibList);
414            }
415        }
416        return returnList;
417    }
418
419    /**
420     * Find containing jar containing.
421     *
422     * @param clazz the clazz
423     * @return the string
424     */
425    @VisibleForTesting
426    protected String findContainingJar(Class clazz) {
427        ClassLoader loader = clazz.getClassLoader();
428        String classFile = clazz.getName().replaceAll("\\.", "/") + ".class";
429        try {
430            for (Enumeration itr = loader.getResources(classFile); itr.hasMoreElements();) {
431                URL url = (URL) itr.nextElement();
432                if ("jar".equals(url.getProtocol())) {
433                    String toReturn = url.getPath();
434                    if (toReturn.startsWith("file:")) {
435                        toReturn = toReturn.substring("file:".length());
436                        // URLDecoder is a misnamed class, since it actually
437                        // decodes
438                        // x-www-form-urlencoded MIME type rather than actual
439                        // URL encoding (which the file path has). Therefore it
440                        // would
441                        // decode +s to ' 's which is incorrect (spaces are
442                        // actually
443                        // either unencoded or encoded as "%20"). Replace +s
444                        // first, so
445                        // that they are kept sacred during the decoding
446                        // process.
447                        toReturn = toReturn.replaceAll("\\+", "%2B");
448                        toReturn = URLDecoder.decode(toReturn, "UTF-8");
449                        toReturn = toReturn.replaceAll("!.*$", "");
450                        return toReturn;
451                    }
452                }
453            }
454        }
455        catch (IOException ioe) {
456            throw new RuntimeException(ioe);
457        }
458        return null;
459    }
460
461    /**
462     * Purge libs.
463     *
464     * @param fs the fs
465     * @param prefix the prefix
466     * @param current the current time
467     * @throws IOException Signals that an I/O exception has occurred.
468     */
469    private void purgeLibs(FileSystem fs, final String prefix, final Date current) throws IOException {
470        Path executorLibBasePath = services.get(WorkflowAppService.class).getSystemLibPath();
471        PathFilter directoryFilter = new PathFilter() {
472            @Override
473            public boolean accept(Path path) {
474                if (path.getName().startsWith(prefix)) {
475                    String name = path.getName();
476                    String time = name.substring(prefix.length());
477                    Date d = null;
478                    try {
479                        d = dateFormat.parse(time);
480                    }
481                    catch (ParseException e) {
482                        return false;
483                    }
484                    return (current.getTime() - d.getTime()) > retentionTime;
485                }
486                else {
487                    return false;
488                }
489            }
490        };
491        FileStatus[] dirList = fs.listStatus(executorLibBasePath, directoryFilter);
492        Arrays.sort(dirList, new Comparator<FileStatus>() {
493            // sort in desc order
494            @Override
495            public int compare(FileStatus o1, FileStatus o2) {
496                return o2.getPath().getName().compareTo(o1.getPath().getName());
497            }
498        });
499
500        // Logic is to keep all share-lib between current timestamp and 7days old + 1 latest sharelib older than 7 days.
501        // refer OOZIE-1761
502        for (int i = 1; i < dirList.length; i++) {
503            Path dirPath = dirList[i].getPath();
504            fs.delete(dirPath, true);
505            LOG.info("Deleted old launcher jar lib directory {0}", dirPath.getName());
506        }
507    }
508
509    @Override
510    public void destroy() {
511        shareLibMap.clear();
512        launcherLibMap.clear();
513    }
514
515    @Override
516    public Class<? extends Service> getInterface() {
517        return ShareLibService.class;
518    }
519
520    /**
521     * Update share lib cache.
522     *
523     * @return the map
524     * @throws IOException Signals that an I/O exception has occurred.
525     */
526    public Map<String, String> updateShareLib() throws IOException {
527        Map<String, String> status = new HashMap<String, String>();
528
529        if (fs == null) {
530            Path launcherlibPath = getLauncherlibPath();
531            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
532            URI uri = launcherlibPath.toUri();
533            fs = FileSystem.get(has.createJobConf(uri.getAuthority()));
534        }
535
536        Map<String, List<Path>> tempShareLibMap = new HashMap<String, List<Path>>();
537        Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>();
538        Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<String, Map<Path, Configuration>>();
539
540        if (!StringUtils.isEmpty(sharelibMappingFile.trim())) {
541            String sharelibMetaFileNewTimeStamp = JsonUtils.formatDateRfc822(
542                    new Date(fs.getFileStatus(new Path(sharelibMappingFile)).getModificationTime()), "GMT");
543            loadShareLibMetaFile(tempShareLibMap, tmpSymlinkMapping, tmpShareLibConfigMap, sharelibMappingFile, null);
544            status.put("sharelibMetaFile", sharelibMappingFile);
545            status.put("sharelibMetaFileNewTimeStamp", sharelibMetaFileNewTimeStamp);
546            status.put("sharelibMetaFileOldTimeStamp", sharelibMetaFileOldTimeStamp);
547            sharelibMetaFileOldTimeStamp = sharelibMetaFileNewTimeStamp;
548        }
549        else {
550            Path shareLibpath = getLatestLibPath(services.get(WorkflowAppService.class).getSystemLibPath(),
551                    SHARE_LIB_PREFIX);
552            loadShareLibfromDFS(tempShareLibMap, shareLibpath, tmpShareLibConfigMap);
553
554            if (shareLibpath != null) {
555                status.put("sharelibDirNew", shareLibpath.toString());
556                status.put("sharelibDirOld", sharelibDirOld);
557                sharelibDirOld = shareLibpath.toString();
558            }
559
560        }
561        shareLibMap = tempShareLibMap;
562        symlinkMapping = tmpSymlinkMapping;
563        shareLibConfigMap = tmpShareLibConfigMap;
564        return status;
565    }
566
567    /**
568     * Update share lib cache. Parse the share lib directory and each sub directory is a action key
569     *
570     * @param shareLibMap the share lib jar map
571     * @param shareLibpath the share libpath
572     * @throws IOException Signals that an I/O exception has occurred.
573     */
574    private void loadShareLibfromDFS(Map<String, List<Path>> shareLibMap, Path shareLibpath,
575            Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException {
576
577        if (shareLibpath == null) {
578            LOG.info("No share lib directory found");
579            return;
580
581        }
582
583        FileStatus[] dirList = fs.listStatus(shareLibpath);
584
585        if (dirList == null) {
586            return;
587        }
588
589        for (FileStatus dir : dirList) {
590            if (!dir.isDir()) {
591                continue;
592            }
593            List<Path> listOfPaths = new ArrayList<Path>();
594            getPathRecursively(fs, dir.getPath(), listOfPaths, dir.getPath().getName(), shareLibConfigMap);
595            shareLibMap.put(dir.getPath().getName(), listOfPaths);
596            LOG.info("Share lib for " + dir.getPath().getName() + ":" + listOfPaths);
597
598        }
599
600    }
601
602    /**
603     * Load share lib text file. Sharelib mapping files contains list of key=value. where key is the action key and
604     * value is the DFS location of sharelib files.
605     *
606     * @param shareLibMap the share lib jar map
607     * @param symlinkMapping the symlink mapping
608     * @param sharelibFileMapping the sharelib file mapping
609     * @param shareLibKey the share lib key
610     * @throws IOException Signals that an I/O exception has occurred.
611     * @parm shareLibKey the sharelib key
612     */
613    private void loadShareLibMetaFile(Map<String, List<Path>> shareLibMap, Map<String, Map<Path, Path>> symlinkMapping,
614            Map<String, Map<Path, Configuration>> shareLibConfigMap, String sharelibFileMapping, String shareLibKey)
615            throws IOException {
616
617        Path shareFileMappingPath = new Path(sharelibFileMapping);
618        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
619        FileSystem filesystem = FileSystem.get(has.createJobConf(shareFileMappingPath.toUri().getAuthority()));
620        Properties prop = new Properties();
621        prop.load(filesystem.open(new Path(sharelibFileMapping)));
622
623        for (Object keyObject : prop.keySet()) {
624            String key = (String) keyObject;
625            String mapKey = key.substring(SHARE_LIB_CONF_PREFIX.length() + 1);
626            if (key.toLowerCase().startsWith(SHARE_LIB_CONF_PREFIX)
627                    && (shareLibKey == null || shareLibKey.equals(mapKey))) {
628                loadSharelib(shareLibMap, symlinkMapping, shareLibConfigMap, mapKey,
629                        ((String) prop.get(key)).split(","));
630            }
631        }
632    }
633
634    private void loadSharelib(Map<String, List<Path>> tmpShareLibMap, Map<String, Map<Path, Path>> tmpSymlinkMapping,
635            Map<String, Map<Path, Configuration>> shareLibConfigMap, String shareLibKey, String pathList[])
636            throws IOException {
637        List<Path> listOfPaths = new ArrayList<Path>();
638        Map<Path, Path> symlinkMappingforAction = new HashMap<Path, Path>();
639        HadoopShims fileSystem = new HadoopShims(fs);
640
641        for (String dfsPath : pathList) {
642            Path path = new Path(dfsPath);
643            getPathRecursively(fs, new Path(dfsPath), listOfPaths, shareLibKey, shareLibConfigMap);
644            if (HadoopShims.isSymlinkSupported() && fileSystem.isSymlink(path)) {
645                symlinkMappingforAction.put(fs.makeQualified(path), fileSystem.getSymLinkTarget(path));
646            }
647        }
648        if (HadoopShims.isSymlinkSupported()) {
649            LOG.info("symlink for " + shareLibKey + ":" + symlinkMappingforAction);
650            tmpSymlinkMapping.put(shareLibKey, symlinkMappingforAction);
651        }
652        tmpShareLibMap.put(shareLibKey, listOfPaths);
653        LOG.info("Share lib for " + shareLibKey + ":" + listOfPaths);
654    }
655
656    /**
657     * Gets the launcherlib path.
658     *
659     * @return the launcherlib path
660     */
661    private Path getLauncherlibPath() {
662        String formattedDate = dateFormat.format(Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime());
663        Path tmpLauncherLibPath = new Path(services.get(WorkflowAppService.class).getSystemLibPath(), LAUNCHER_LIB_PREFIX
664                + formattedDate);
665        return tmpLauncherLibPath;
666    }
667
668    /**
669     * Gets the Latest lib path.
670     *
671     * @param rootDir the root dir
672     * @param prefix the prefix
673     * @return latest lib path
674     * @throws IOException Signals that an I/O exception has occurred.
675     */
676    public Path getLatestLibPath(Path rootDir, final String prefix) throws IOException {
677        Date max = new Date(0L);
678        Path path = null;
679        PathFilter directoryFilter = new PathFilter() {
680            @Override
681            public boolean accept(Path path) {
682                return path.getName().startsWith(prefix);
683            }
684        };
685
686        FileStatus[] files = fs.listStatus(rootDir, directoryFilter);
687        for (FileStatus file : files) {
688            String name = file.getPath().getName().toString();
689            String time = name.substring(prefix.length());
690            Date d = null;
691            try {
692                d = dateFormat.parse(time);
693            }
694            catch (ParseException e) {
695                continue;
696            }
697            if (d.compareTo(max) > 0) {
698                path = file.getPath();
699                max = d;
700            }
701        }
702        // If there are no timestamped directories, fall back to root directory
703        if (path == null) {
704            path = rootDir;
705        }
706        return path;
707    }
708
709    /**
710     * Instruments the log service.
711     * <p>
712     * It sets instrumentation variables indicating the location of the sharelib and launcherlib
713     *
714     * @param instr instrumentation to use.
715     */
716    @Override
717    public void instrument(Instrumentation instr) {
718        instr.addVariable("libs", "sharelib.source", new Instrumentation.Variable<String>() {
719            @Override
720            public String getValue() {
721                if (!StringUtils.isEmpty(sharelibMappingFile.trim())) {
722                    return SHARELIB_MAPPING_FILE;
723                }
724                return WorkflowAppService.SYSTEM_LIB_PATH;
725            }
726        });
727        instr.addVariable("libs", "sharelib.mapping.file", new Instrumentation.Variable<String>() {
728            @Override
729            public String getValue() {
730                if (!StringUtils.isEmpty(sharelibMappingFile.trim())) {
731                    return sharelibMappingFile;
732                }
733                return "(none)";
734            }
735        });
736        instr.addVariable("libs", "sharelib.system.libpath", new Instrumentation.Variable<String>() {
737            @Override
738            public String getValue() {
739                String sharelibPath = "(unavailable)";
740                try {
741                    Path libPath = getLatestLibPath(services.get(WorkflowAppService.class).getSystemLibPath(),
742                            SHARE_LIB_PREFIX);
743                    if (libPath != null) {
744                        sharelibPath = libPath.toUri().toString();
745                    }
746                }
747                catch (IOException ioe) {
748                    // ignore exception because we're just doing instrumentation
749                }
750                return sharelibPath;
751            }
752        });
753        instr.addVariable("libs", "sharelib.mapping.file.timestamp", new Instrumentation.Variable<String>() {
754            @Override
755            public String getValue() {
756                if (!StringUtils.isEmpty(sharelibMetaFileOldTimeStamp)) {
757                    return sharelibMetaFileOldTimeStamp;
758                }
759                return "(none)";
760            }
761        });
762        instr.addVariable("libs", "sharelib.keys", new Instrumentation.Variable<String>() {
763            @Override
764            public String getValue() {
765                Map<String, List<Path>> shareLib = getShareLib();
766                if (shareLib != null && !shareLib.isEmpty()) {
767                    Set<String> keySet = shareLib.keySet();
768                    return keySet.toString();
769                }
770                return "(unavailable)";
771            }
772        });
773        instr.addVariable("libs", "launcherlib.system.libpath", new Instrumentation.Variable<String>() {
774            @Override
775            public String getValue() {
776                return getLauncherlibPath().toUri().toString();
777            }
778        });
779        instr.addVariable("libs", "sharelib.symlink.mapping", new Instrumentation.Variable<String>() {
780            @Override
781            public String getValue() {
782                Map<String, Map<Path, Path>> shareLibSymlinkMapping = getSymlinkMapping();
783                if (shareLibSymlinkMapping != null && !shareLibSymlinkMapping.isEmpty()
784                        && shareLibSymlinkMapping.values() != null && !shareLibSymlinkMapping.values().isEmpty()) {
785                    StringBuffer bf = new StringBuffer();
786                    for (Entry<String, Map<Path, Path>> entry : shareLibSymlinkMapping.entrySet())
787                        if (entry.getKey() != null && !entry.getValue().isEmpty()) {
788                            for (Path path : entry.getValue().keySet()) {
789                                bf.append(path).append("(").append(entry.getKey()).append(")").append("=>")
790                                        .append(shareLibSymlinkMapping.get(entry.getKey()) != null ? shareLibSymlinkMapping
791                                                .get(entry.getKey()).get(path) : "").append(",");
792                            }
793                        }
794                    return bf.toString();
795                }
796                return "(none)";
797            }
798        });
799
800        instr.addVariable("libs", "sharelib.cached.config.file", new Instrumentation.Variable<String>() {
801            @Override
802            public String getValue() {
803                Map<String, Map<Path, Configuration>> shareLibConfigMap = getShareLibConfigMap();
804                if (shareLibConfigMap != null && !shareLibConfigMap.isEmpty()) {
805                    StringBuffer bf = new StringBuffer();
806
807                    for (String path : shareLibConfigMap.keySet()) {
808                        bf.append(path).append(";");
809                    }
810                    return bf.toString();
811                }
812                return "(none)";
813            }
814        });
815
816    }
817
818    /**
819     * Returns file system for shared libraries.
820     * <p>
821     * If WorkflowAppService#getSystemLibPath doesn't have authority then a default one assumed
822     *
823     * @return file system for shared libraries
824     */
825    public FileSystem getFileSystem() {
826        return fs;
827    }
828
829    /**
830     * Cache XML conf file
831     *
832     * @param hdfsPath the hdfs path
833     * @param shareLibKey the share lib key
834     * @throws IOException Signals that an I/O exception has occurred.
835     * @throws JDOMException
836     */
837    private void cachePropertyFile(Path qualifiedHdfsPath, Path hdfsPath, String shareLibKey,
838            Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException, JDOMException {
839        Map<Path, Configuration> confMap = shareLibConfigMap.get(shareLibKey);
840        if (confMap == null) {
841            confMap = new HashMap<Path, Configuration>();
842            shareLibConfigMap.put(shareLibKey, confMap);
843        }
844        Configuration xmlConf = new XConfiguration(fs.open(hdfsPath));
845        confMap.put(qualifiedHdfsPath, xmlConf);
846
847    }
848
849    private void cacheActionKeySharelibConfList() {
850        ActionService actionService = Services.get().get(ActionService.class);
851        Set<String> actionTypes = actionService.getActionTypes();
852        for (String key : actionTypes) {
853            ActionExecutor executor = actionService.getExecutor(key);
854            if (executor instanceof JavaActionExecutor) {
855                JavaActionExecutor jexecutor = (JavaActionExecutor) executor;
856                actionConfSet.addAll(
857                new HashSet<String>(Arrays.asList(jexecutor.getShareLibFilesForActionConf() == null ? new String[0]
858                        : jexecutor.getShareLibFilesForActionConf())));
859            }
860        }
861    }
862
863    public Configuration getShareLibConf(String inputKey, Path path) {
864        if (shareLibConfigMap.containsKey(inputKey)) {
865            return shareLibConfigMap.get(inputKey).get(path);
866        }
867
868        return null;
869    }
870
871    @VisibleForTesting
872    public Map<String, Map<Path, Configuration>> getShareLibConfigMap() {
873        return shareLibConfigMap;
874    }
875
876    private boolean isFilePartOfConfList(Path path) throws URISyntaxException {
877        String fragmentName = new URI(path.toString()).getFragment();
878        String fileName = fragmentName == null ? path.getName() : fragmentName;
879        return actionConfSet.contains(fileName);
880    }
881}