001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.io.File;
022import java.io.IOException;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.net.URL;
026import java.net.URLDecoder;
027import java.text.MessageFormat;
028import java.text.ParseException;
029import java.text.SimpleDateFormat;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.Calendar;
033import java.util.Comparator;
034import java.util.Date;
035import java.util.Enumeration;
036import java.util.HashMap;
037import java.util.HashSet;
038import java.util.List;
039import java.util.Map;
040import java.util.Properties;
041import java.util.Set;
042import java.util.TimeZone;
043import java.util.Map.Entry;
044import org.apache.commons.lang.StringUtils;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.fs.FileStatus;
047import org.apache.hadoop.fs.FileSystem;
048import org.apache.hadoop.fs.LocalFileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.fs.PathFilter;
051import org.apache.hadoop.fs.permission.FsPermission;
052import org.apache.oozie.action.ActionExecutor;
053import org.apache.oozie.action.hadoop.JavaActionExecutor;
054import org.apache.oozie.client.rest.JsonUtils;
055import com.google.common.annotations.VisibleForTesting;
056import org.apache.oozie.ErrorCode;
057import org.apache.oozie.util.Instrumentable;
058import org.apache.oozie.util.Instrumentation;
059import org.apache.oozie.util.FSUtils;
060import org.apache.oozie.util.XConfiguration;
061import org.apache.oozie.util.XLog;
062import org.jdom.JDOMException;
063
064import static org.apache.oozie.util.FSUtils.isLocalFile;
065
066public class ShareLibService implements Service, Instrumentable {
067
068    public static final String LAUNCHERJAR_LIB_RETENTION = CONF_PREFIX + "ShareLibService.temp.sharelib.retention.days";
069
070    public static final String SHARELIB_MAPPING_FILE = CONF_PREFIX + "ShareLibService.mapping.file";
071
072    public static final String SHIP_LAUNCHER_JAR = "oozie.action.ship.launcher.jar";
073
074    public static final String PURGE_INTERVAL = CONF_PREFIX + "ShareLibService.purge.interval";
075
076    public static final String FAIL_FAST_ON_STARTUP = CONF_PREFIX + "ShareLibService.fail.fast.on.startup";
077
078    private static final String PERMISSION_STRING = "-rwxr-xr-x";
079
080    public static final String LAUNCHER_LIB_PREFIX = "launcher_";
081
082    public static final String SHARE_LIB_PREFIX = "lib_";
083
084    public static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
085
086    private Services services;
087
088    private Map<String, List<Path>> shareLibMap = new HashMap<String, List<Path>>();
089
090    private Map<String, Map<Path, Configuration>> shareLibConfigMap = new HashMap<String, Map<Path, Configuration>>();
091
092    private Map<String, List<Path>> launcherLibMap = new HashMap<String, List<Path>>();
093
094    private Set<String> actionConfSet = new HashSet<String>();
095
096    // symlink mapping. Oozie keeps on checking symlink path and if changes, Oozie reloads the sharelib
097    private Map<String, Map<Path, Path>> symlinkMapping = new HashMap<String, Map<Path, Path>>();
098
099    private static XLog LOG = XLog.getLog(ShareLibService.class);
100
101    private String sharelibMappingFile;
102
103    private boolean isShipLauncherEnabled = false;
104
105    public static String SHARE_LIB_CONF_PREFIX = "oozie";
106
107    private boolean shareLibLoadAttempted = false;
108
109    private String sharelibMetaFileOldTimeStamp;
110
111    private String sharelibDirOld;
112
113    FileSystem fs;
114    FileSystem localFs;
115
116    final long retentionTime = 1000L * 60 * 60 * 24 * ConfigurationService.getInt(LAUNCHERJAR_LIB_RETENTION);
117
118    @Override
119    public void init(Services services) throws ServiceException {
120        this.services = services;
121        sharelibMappingFile = ConfigurationService.get(services.getConf(), SHARELIB_MAPPING_FILE);
122        isShipLauncherEnabled = ConfigurationService.getBoolean(services.getConf(), SHIP_LAUNCHER_JAR);
123        boolean failOnfailure = ConfigurationService.getBoolean(services.getConf(), FAIL_FAST_ON_STARTUP);
124        Path launcherlibPath = getLauncherlibPath();
125        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
126        URI uri = launcherlibPath.toUri();
127        try {
128
129            fs = FileSystem.get(has.createConfiguration(uri.getAuthority()));
130            localFs = LocalFileSystem.get(new Configuration(false));
131            //cache action key sharelib conf list
132            cacheActionKeySharelibConfList();
133            updateLauncherLib();
134            updateShareLib();
135        }
136        catch (Throwable e) {
137            if (failOnfailure) {
138                LOG.error("Sharelib initialization fails", e);
139                throw new ServiceException(ErrorCode.E0104, getClass().getName(), "Sharelib initialization fails. ", e);
140            }
141            else {
142                // We don't want to actually fail init by throwing an Exception, so only create the ServiceException and
143                // log it
144                ServiceException se = new ServiceException(ErrorCode.E0104, getClass().getName(),
145                        "Not able to cache sharelib. An Admin needs to install the sharelib with oozie-setup.sh and issue the "
146                                + "'oozie admin' CLI command to update the sharelib", e);
147                LOG.error(se);
148            }
149        }
150        Runnable purgeLibsRunnable = new Runnable() {
151            @Override
152            public void run() {
153                System.out.flush();
154                try {
155                    // Only one server should purge sharelib
156                    if (Services.get().get(JobsConcurrencyService.class).isLeader()) {
157                        final Date current = Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime();
158                        purgeLibs(fs, LAUNCHER_LIB_PREFIX, current);
159                        purgeLibs(fs, SHARE_LIB_PREFIX, current);
160                    }
161                }
162                catch (IOException e) {
163                    LOG.error("There was an issue purging the sharelib", e);
164                }
165            }
166        };
167        services.get(SchedulerService.class).schedule(purgeLibsRunnable, 10,
168                ConfigurationService.getInt(services.getConf(), PURGE_INTERVAL) * 60 * 60 * 24,
169                SchedulerService.Unit.SEC);
170    }
171
172    /**
173     * Recursively change permissions.
174     *
175     * @throws IOException Signals that an I/O exception has occurred.
176     */
177    private void updateLauncherLib() throws IOException {
178        if (isShipLauncherEnabled) {
179            if (fs == null) {
180                Path launcherlibPath = getLauncherlibPath();
181                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
182                URI uri = launcherlibPath.toUri();
183                fs = FileSystem.get(has.createConfiguration(uri.getAuthority()));
184            }
185            Path launcherlibPath = getLauncherlibPath();
186            setupLauncherLibPath(fs, launcherlibPath);
187            recursiveChangePermissions(fs, launcherlibPath, FsPermission.valueOf(PERMISSION_STRING));
188        }
189
190    }
191
192    /**
193     * Copy launcher jars to Temp directory.
194     *
195     * @param fs the FileSystem
196     * @param tmpLauncherLibPath the tmp launcher lib path
197     * @throws IOException Signals that an I/O exception has occurred.
198     */
199    private void setupLauncherLibPath(FileSystem fs, Path tmpLauncherLibPath) throws IOException {
200
201        ActionService actionService = Services.get().get(ActionService.class);
202        List<Class<?>> classes = JavaActionExecutor.getCommonLauncherClasses();
203        Path baseDir = new Path(tmpLauncherLibPath, JavaActionExecutor.OOZIE_COMMON_LIBDIR);
204        copyJarContainingClasses(classes, fs, baseDir, JavaActionExecutor.OOZIE_COMMON_LIBDIR);
205        Set<String> actionTypes = actionService.getActionTypes();
206        for (String key : actionTypes) {
207            ActionExecutor executor = actionService.getExecutor(key);
208            if (executor instanceof JavaActionExecutor) {
209                JavaActionExecutor jexecutor = (JavaActionExecutor) executor;
210                classes = jexecutor.getLauncherClasses();
211                if (classes != null) {
212                    String type = executor.getType();
213                    Path executorDir = new Path(tmpLauncherLibPath, type);
214                    copyJarContainingClasses(classes, fs, executorDir, type);
215                }
216            }
217        }
218    }
219
220    /**
221     * Recursive change permissions.
222     *
223     * @param fs the FileSystem
224     * @param path the Path
225     * @param fsPerm is permission
226     * @throws IOException Signals that an I/O exception has occurred.
227     */
228    private void recursiveChangePermissions(FileSystem fs, Path path, FsPermission fsPerm) throws IOException {
229        fs.setPermission(path, fsPerm);
230        FileStatus[] filesStatus = fs.listStatus(path);
231        for (int i = 0; i < filesStatus.length; i++) {
232            Path p = filesStatus[i].getPath();
233            if (filesStatus[i].isDirectory()) {
234                recursiveChangePermissions(fs, p, fsPerm);
235            }
236            else {
237                fs.setPermission(p, fsPerm);
238            }
239        }
240    }
241
242    /**
243     * Copy jar containing classes.
244     *
245     * @param classes the classes
246     * @param fs the FileSystem
247     * @param executorDir is Path
248     * @param type is sharelib key
249     * @throws IOException Signals that an I/O exception has occurred.
250     */
251    private void copyJarContainingClasses(List<Class<?>> classes, FileSystem fs, Path executorDir, String type)
252            throws IOException {
253        fs.mkdirs(executorDir);
254        Set<String> localJarSet = new HashSet<String>();
255        for (Class<?> c : classes) {
256            String localJar = findContainingJar(c);
257            if (localJar != null) {
258                localJarSet.add(localJar);
259            }
260            else {
261                throw new IOException("No jar containing " + c + " found");
262            }
263        }
264        List<Path> listOfPaths = new ArrayList<Path>();
265        for (String localJarStr : localJarSet) {
266            File localJar = new File(localJarStr);
267            fs.copyFromLocalFile(new Path(localJar.getPath()), executorDir);
268            Path path = new Path(executorDir, localJar.getName());
269            listOfPaths.add(path);
270            LOG.info(localJar.getName() + " uploaded to " + executorDir.toString());
271        }
272        launcherLibMap.put(type, listOfPaths);
273
274    }
275
276    /**
277     * Gets the path recursively.
278     *
279     * @param fs the FileSystem
280     * @param rootDir the root directory
281     * @param listOfPaths the list of paths
282     * @param shareLibKey the share lib key
283     * @return the path recursively
284     * @throws IOException Signals that an I/O exception has occurred.
285     */
286    private void getPathRecursively(FileSystem fs, Path rootDir, List<Path> listOfPaths, String shareLibKey,
287            Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException {
288        if (rootDir == null) {
289            return;
290        }
291
292        try {
293            if (fs.isFile(new Path(new URI(rootDir.toString()).getPath()))) {
294                Path filePath = new Path(new URI(rootDir.toString()).getPath());
295                Path qualifiedRootDirPath = fs.makeQualified(rootDir);
296                if (isFilePartOfConfList(rootDir)) {
297                    cachePropertyFile(qualifiedRootDirPath, filePath, shareLibKey, shareLibConfigMap);
298                }
299                listOfPaths.add(qualifiedRootDirPath);
300                return;
301            }
302
303            FileStatus[] status = fs.listStatus(rootDir);
304            if (status == null) {
305                LOG.info("Shared lib " + rootDir + " doesn't exist, not adding to cache");
306                return;
307            }
308
309            for (FileStatus file : status) {
310                if (file.isDirectory()) {
311                    getPathRecursively(fs, file.getPath(), listOfPaths, shareLibKey, shareLibConfigMap);
312                }
313                else {
314                    if (isFilePartOfConfList(file.getPath())) {
315                        cachePropertyFile(file.getPath(), file.getPath(), shareLibKey, shareLibConfigMap);
316                    }
317                    listOfPaths.add(file.getPath());
318                }
319            }
320        }
321        catch (URISyntaxException e) {
322            throw new IOException(e);
323        }
324        catch (JDOMException e) {
325            throw new IOException(e);
326        }
327    }
328
329    public Map<String, List<Path>> getShareLib() {
330        return shareLibMap;
331    }
332
333    private Map<String, Map<Path, Path>> getSymlinkMapping() {
334        return symlinkMapping;
335    }
336
337    /**
338     * Gets the action sharelib lib jars.
339     *
340     * @param shareLibKey the sharelib key
341     * @return List of paths
342     * @throws IOException Signals that an I/O exception has occurred.
343     */
344    public List<Path> getShareLibJars(String shareLibKey) throws IOException {
345        // Sharelib map is empty means that on previous or startup attempt of
346        // caching sharelib has failed.Trying to reload
347        if (shareLibMap.isEmpty() && !shareLibLoadAttempted) {
348            synchronized (ShareLibService.class) {
349                if (shareLibMap.isEmpty()) {
350                    updateShareLib();
351                    shareLibLoadAttempted = true;
352                }
353            }
354        }
355        checkSymlink(shareLibKey);
356        return shareLibMap.get(shareLibKey);
357    }
358
359    private void checkSymlink(final String shareLibKey) throws IOException {
360        if (symlinkMapping.get(shareLibKey) == null || symlinkMapping.get(shareLibKey).isEmpty()) {
361            return;
362        }
363
364        for (final Path symlinkPath : symlinkMapping.get(shareLibKey).keySet()) {
365            final FileSystem fileSystem = getHostFileSystem(symlinkPath);
366            final Path symLinkTarget = FSUtils.getSymLinkTarget(fileSystem, symlinkPath);
367            final boolean symlinkIsNotTarget = !getSymlinkSharelibPath(shareLibKey, symlinkPath).equals(symLinkTarget);
368            if (symlinkIsNotTarget) {
369                synchronized (ShareLibService.class) {
370                    final Map<String, List<Path>> tmpShareLibMap = new HashMap<String, List<Path>>(shareLibMap);
371
372                    final Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<>(shareLibConfigMap);
373
374                    final Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>(
375                            symlinkMapping);
376
377                    LOG.info(MessageFormat.format("Symlink target for [{0}] has changed, was [{1}], now [{2}]",
378                            shareLibKey, symlinkPath, symLinkTarget));
379                    loadShareLibMetaFile(tmpShareLibMap, tmpSymlinkMapping, tmpShareLibConfigMap, sharelibMappingFile,
380                            shareLibKey);
381                    shareLibMap = tmpShareLibMap;
382                    symlinkMapping = tmpSymlinkMapping;
383                    shareLibConfigMap = tmpShareLibConfigMap;
384                    return;
385                }
386            }
387        }
388    }
389
390    private Path getSymlinkSharelibPath(String shareLibKey, Path path) {
391        return symlinkMapping.get(shareLibKey).get(path);
392    }
393
394    private FileSystem getHostFileSystem(String pathStr) {
395        FileSystem fileSystem;
396        if (isLocalFile(pathStr)) {
397            fileSystem = localFs;
398        }
399        else {
400            fileSystem = fs;
401        }
402        return fileSystem;
403    }
404
405    private FileSystem getHostFileSystem(Path path) {
406        return getHostFileSystem(path.toString());
407    }
408
409    /**
410     * Gets the launcher jars.
411     *
412     * @param shareLibKey the shareLib key
413     * @return launcher jars paths
414     * @throws IOException Signals that an I/O exception has occurred.
415     */
416    public List<Path> getSystemLibJars(String shareLibKey) throws IOException {
417        List<Path> returnList = new ArrayList<Path>();
418        // Sharelib map is empty means that on previous or startup attempt of
419        // caching launcher jars has failed.Trying to reload
420        if (isShipLauncherEnabled) {
421            if (launcherLibMap.isEmpty()) {
422                synchronized (ShareLibService.class) {
423                    if (launcherLibMap.isEmpty()) {
424                        updateLauncherLib();
425                    }
426                }
427            }
428            if (launcherLibMap.get(shareLibKey) != null) {
429                returnList.addAll(launcherLibMap.get(shareLibKey));
430            }
431        }
432        if (shareLibKey.equals(JavaActionExecutor.OOZIE_COMMON_LIBDIR)) {
433            List<Path> sharelibList = getShareLibJars(shareLibKey);
434            if (sharelibList != null) {
435                returnList.addAll(sharelibList);
436            }
437        }
438        return returnList;
439    }
440
441    /**
442     * Find containing jar containing.
443     *
444     * @param clazz the clazz
445     * @return the string
446     */
447    @VisibleForTesting
448    protected String findContainingJar(Class<?> clazz) {
449        ClassLoader loader = clazz.getClassLoader();
450        String classFile = clazz.getName().replaceAll("\\.", "/") + ".class";
451        try {
452            for (Enumeration<URL> itr = loader.getResources(classFile); itr.hasMoreElements();) {
453                URL url = itr.nextElement();
454                if ("jar".equals(url.getProtocol())) {
455                    String toReturn = url.getPath();
456                    if (toReturn.startsWith("file:")) {
457                        toReturn = toReturn.substring("file:".length());
458                        // URLDecoder is a misnamed class, since it actually
459                        // decodes
460                        // x-www-form-urlencoded MIME type rather than actual
461                        // URL encoding (which the file path has). Therefore it
462                        // would
463                        // decode +s to ' 's which is incorrect (spaces are
464                        // actually
465                        // either unencoded or encoded as "%20"). Replace +s
466                        // first, so
467                        // that they are kept sacred during the decoding
468                        // process.
469                        toReturn = toReturn.replaceAll("\\+", "%2B");
470                        toReturn = URLDecoder.decode(toReturn, "UTF-8");
471                        toReturn = toReturn.replaceAll("!.*$", "");
472                        return toReturn;
473                    }
474                }
475            }
476        }
477        catch (IOException ioe) {
478            throw new RuntimeException(ioe);
479        }
480        return null;
481    }
482
483    /**
484     * Purge libs.
485     *
486     * @param fs the fs
487     * @param prefix the prefix
488     * @param current the current time
489     * @throws IOException Signals that an I/O exception has occurred.
490     */
491    private void purgeLibs(FileSystem fs, final String prefix, final Date current) throws IOException {
492        Path executorLibBasePath = services.get(WorkflowAppService.class).getSystemLibPath();
493        PathFilter directoryFilter = new PathFilter() {
494            @Override
495            public boolean accept(Path path) {
496                if (path.getName().startsWith(prefix)) {
497                    String name = path.getName();
498                    String time = name.substring(prefix.length());
499                    Date d = null;
500                    try {
501                        d = dateFormat.parse(time);
502                    }
503                    catch (ParseException e) {
504                        return false;
505                    }
506                    return (current.getTime() - d.getTime()) > retentionTime;
507                }
508                else {
509                    return false;
510                }
511            }
512        };
513        FileStatus[] dirList = fs.listStatus(executorLibBasePath, directoryFilter);
514        Arrays.sort(dirList, new Comparator<FileStatus>() {
515            // sort in desc order
516            @Override
517            public int compare(FileStatus o1, FileStatus o2) {
518                return o2.getPath().getName().compareTo(o1.getPath().getName());
519            }
520        });
521
522        // Logic is to keep all share-lib between current timestamp and 7days old + 1 latest sharelib older than 7 days.
523        // refer OOZIE-1761
524        for (int i = 1; i < dirList.length; i++) {
525            Path dirPath = dirList[i].getPath();
526            fs.delete(dirPath, true);
527            LOG.info("Deleted old launcher jar lib directory {0}", dirPath.getName());
528        }
529    }
530
531    @Override
532    public void destroy() {
533        shareLibMap.clear();
534        launcherLibMap.clear();
535    }
536
537    @Override
538    public Class<? extends Service> getInterface() {
539        return ShareLibService.class;
540    }
541
542    /**
543     * Update share lib cache.
544     *
545     * @return the map
546     * @throws IOException Signals that an I/O exception has occurred.
547     */
548    public Map<String, String> updateShareLib() throws IOException {
549        Map<String, String> status = new HashMap<String, String>();
550
551        if (fs == null) {
552            Path launcherlibPath = getLauncherlibPath();
553            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
554            URI uri = launcherlibPath.toUri();
555            fs = FileSystem.get(has.createConfiguration(uri.getAuthority()));
556        }
557
558        Map<String, List<Path>> tempShareLibMap = new HashMap<String, List<Path>>();
559        Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>();
560        Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<String, Map<Path, Configuration>>();
561
562        String trimmedSharelibMappingFile = sharelibMappingFile.trim();
563        if (!StringUtils.isEmpty(trimmedSharelibMappingFile)) {
564            FileSystem fileSystem = getHostFileSystem(trimmedSharelibMappingFile);
565
566            String sharelibMetaFileNewTimeStamp = JsonUtils.formatDateRfc822(
567                    new Date(fileSystem.getFileStatus(new Path(sharelibMappingFile)).getModificationTime()), "GMT");
568
569            loadShareLibMetaFile(tempShareLibMap, tmpSymlinkMapping, tmpShareLibConfigMap, sharelibMappingFile, null);
570            status.put("sharelibMetaFile", sharelibMappingFile);
571            status.put("sharelibMetaFileNewTimeStamp", sharelibMetaFileNewTimeStamp);
572            status.put("sharelibMetaFileOldTimeStamp", sharelibMetaFileOldTimeStamp);
573            sharelibMetaFileOldTimeStamp = sharelibMetaFileNewTimeStamp;
574        }
575        else {
576            Path shareLibpath = getLatestLibPath(services.get(WorkflowAppService.class).getSystemLibPath(),
577                    SHARE_LIB_PREFIX);
578            loadShareLibfromDFS(tempShareLibMap, shareLibpath, tmpShareLibConfigMap);
579
580            if (shareLibpath != null) {
581                status.put("sharelibDirNew", shareLibpath.toString());
582                status.put("sharelibDirOld", sharelibDirOld);
583                sharelibDirOld = shareLibpath.toString();
584            }
585
586        }
587        shareLibMap = tempShareLibMap;
588        symlinkMapping = tmpSymlinkMapping;
589        shareLibConfigMap = tmpShareLibConfigMap;
590        return status;
591    }
592
593    /**
594     * Update share lib cache. Parse the share lib directory and each sub directory is a action key
595     *
596     * @param shareLibMap the share lib jar map
597     * @param shareLibpath the share libpath
598     * @throws IOException Signals that an I/O exception has occurred.
599     */
600    private void loadShareLibfromDFS(Map<String, List<Path>> shareLibMap, Path shareLibpath,
601            Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException {
602
603        if (shareLibpath == null) {
604            LOG.info("No share lib directory found");
605            return;
606
607        }
608
609        FileStatus[] dirList = fs.listStatus(shareLibpath);
610
611        if (dirList == null) {
612            return;
613        }
614
615        for (FileStatus dir : dirList) {
616            if (!dir.isDirectory()) {
617                continue;
618            }
619            List<Path> listOfPaths = new ArrayList<Path>();
620            getPathRecursively(fs, dir.getPath(), listOfPaths, dir.getPath().getName(), shareLibConfigMap);
621            shareLibMap.put(dir.getPath().getName(), listOfPaths);
622            LOG.info("Share lib for " + dir.getPath().getName() + ":" + listOfPaths);
623
624        }
625
626    }
627
628    /**
629     * Load share lib text file. Sharelib mapping files contains list of key=value. where key is the action key and
630     * value is the DFS location of sharelib files.
631     *
632     * @param shareLibMap the share lib jar map
633     * @param symlinkMapping the symlink mapping
634     * @param sharelibFileMapping the sharelib file mapping
635     * @param shareLibKey the share lib key
636     * @throws IOException Signals that an I/O exception has occurred.
637     * @parm shareLibKey the sharelib key
638     */
639    private void loadShareLibMetaFile(Map<String, List<Path>> shareLibMap, Map<String, Map<Path, Path>> symlinkMapping,
640            Map<String, Map<Path, Configuration>> shareLibConfigMap, String sharelibFileMapping, String shareLibKey)
641            throws IOException {
642
643        Path shareFileMappingPath = new Path(sharelibFileMapping);
644        FileSystem filesystem = getHostFileSystem(shareFileMappingPath);
645
646        Properties prop = new Properties();
647        prop.load(filesystem.open(new Path(sharelibFileMapping)));
648
649        for (Object keyObject : prop.keySet()) {
650            String key = (String) keyObject;
651            String mapKey = key.substring(SHARE_LIB_CONF_PREFIX.length() + 1);
652            if (key.toLowerCase().startsWith(SHARE_LIB_CONF_PREFIX)
653                    && (shareLibKey == null || shareLibKey.equals(mapKey))) {
654                loadSharelib(shareLibMap, symlinkMapping, shareLibConfigMap, mapKey,
655                        ((String) prop.get(key)).split(","));
656            }
657        }
658    }
659
660    private void loadSharelib(Map<String, List<Path>> tmpShareLibMap, Map<String, Map<Path, Path>> tmpSymlinkMapping,
661            Map<String, Map<Path, Configuration>> shareLibConfigMap, String shareLibKey, String pathList[])
662            throws IOException {
663        List<Path> listOfPaths = new ArrayList<Path>();
664        Map<Path, Path> symlinkMappingforAction = new HashMap<Path, Path>();
665
666        for (String pathStr : pathList) {
667            Path path = new Path(pathStr);
668            final FileSystem fileSystem = getHostFileSystem(pathStr);
669
670            getPathRecursively(fileSystem, path, listOfPaths, shareLibKey, shareLibConfigMap);
671            if (FSUtils.isSymlink(fileSystem, path)) {
672                symlinkMappingforAction.put(path, FSUtils.getSymLinkTarget(fileSystem, path));
673            }
674        }
675
676        LOG.info("symlink for " + shareLibKey + ":" + symlinkMappingforAction);
677        tmpSymlinkMapping.put(shareLibKey, symlinkMappingforAction);
678
679        tmpShareLibMap.put(shareLibKey, listOfPaths);
680        LOG.info("Share lib for " + shareLibKey + ":" + listOfPaths);
681    }
682
683    /**
684     * Gets the launcherlib path.
685     *
686     * @return the launcherlib path
687     */
688    private Path getLauncherlibPath() {
689        String formattedDate = dateFormat.format(Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime());
690        Path tmpLauncherLibPath = new Path(services.get(WorkflowAppService.class).getSystemLibPath(), LAUNCHER_LIB_PREFIX
691                + formattedDate);
692        return tmpLauncherLibPath;
693    }
694
695    /**
696     * Gets the Latest lib path.
697     *
698     * @param rootDir the root dir
699     * @param prefix the prefix
700     * @return latest lib path
701     * @throws IOException Signals that an I/O exception has occurred.
702     */
703    public Path getLatestLibPath(Path rootDir, final String prefix) throws IOException {
704        Date max = new Date(0L);
705        Path path = null;
706        PathFilter directoryFilter = new PathFilter() {
707            @Override
708            public boolean accept(Path path) {
709                return path.getName().startsWith(prefix);
710            }
711        };
712
713        FileStatus[] files = fs.listStatus(rootDir, directoryFilter);
714        for (FileStatus file : files) {
715            String name = file.getPath().getName().toString();
716            String time = name.substring(prefix.length());
717            Date d = null;
718            try {
719                d = dateFormat.parse(time);
720            }
721            catch (ParseException e) {
722                continue;
723            }
724            if (d.compareTo(max) > 0) {
725                path = file.getPath();
726                max = d;
727            }
728        }
729        // If there are no timestamped directories, fall back to root directory
730        if (path == null) {
731            path = rootDir;
732        }
733        return path;
734    }
735
736    /**
737     * Instruments the log service.
738     * <p>
739     * It sets instrumentation variables indicating the location of the sharelib and launcherlib
740     *
741     * @param instr instrumentation to use.
742     */
743    @Override
744    public void instrument(Instrumentation instr) {
745        instr.addVariable("libs", "sharelib.source", new Instrumentation.Variable<String>() {
746            @Override
747            public String getValue() {
748                if (!StringUtils.isEmpty(sharelibMappingFile.trim())) {
749                    return SHARELIB_MAPPING_FILE;
750                }
751                return WorkflowAppService.SYSTEM_LIB_PATH;
752            }
753        });
754        instr.addVariable("libs", "sharelib.mapping.file", new Instrumentation.Variable<String>() {
755            @Override
756            public String getValue() {
757                if (!StringUtils.isEmpty(sharelibMappingFile.trim())) {
758                    return sharelibMappingFile;
759                }
760                return "(none)";
761            }
762        });
763        instr.addVariable("libs", "sharelib.system.libpath", new Instrumentation.Variable<String>() {
764            @Override
765            public String getValue() {
766                String sharelibPath = "(unavailable)";
767                try {
768                    Path libPath = getLatestLibPath(services.get(WorkflowAppService.class).getSystemLibPath(),
769                            SHARE_LIB_PREFIX);
770                    if (libPath != null) {
771                        sharelibPath = libPath.toUri().toString();
772                    }
773                }
774                catch (IOException ioe) {
775                    // ignore exception because we're just doing instrumentation
776                }
777                return sharelibPath;
778            }
779        });
780        instr.addVariable("libs", "sharelib.mapping.file.timestamp", new Instrumentation.Variable<String>() {
781            @Override
782            public String getValue() {
783                if (!StringUtils.isEmpty(sharelibMetaFileOldTimeStamp)) {
784                    return sharelibMetaFileOldTimeStamp;
785                }
786                return "(none)";
787            }
788        });
789        instr.addVariable("libs", "sharelib.keys", new Instrumentation.Variable<String>() {
790            @Override
791            public String getValue() {
792                Map<String, List<Path>> shareLib = getShareLib();
793                if (shareLib != null && !shareLib.isEmpty()) {
794                    Set<String> keySet = shareLib.keySet();
795                    return keySet.toString();
796                }
797                return "(unavailable)";
798            }
799        });
800        instr.addVariable("libs", "launcherlib.system.libpath", new Instrumentation.Variable<String>() {
801            @Override
802            public String getValue() {
803                return getLauncherlibPath().toUri().toString();
804            }
805        });
806        instr.addVariable("libs", "sharelib.symlink.mapping", new Instrumentation.Variable<String>() {
807            @Override
808            public String getValue() {
809                Map<String, Map<Path, Path>> shareLibSymlinkMapping = getSymlinkMapping();
810                if (shareLibSymlinkMapping != null && !shareLibSymlinkMapping.isEmpty()
811                        && shareLibSymlinkMapping.values() != null && !shareLibSymlinkMapping.values().isEmpty()) {
812                    StringBuffer bf = new StringBuffer();
813                    for (Entry<String, Map<Path, Path>> entry : shareLibSymlinkMapping.entrySet())
814                        if (entry.getKey() != null && !entry.getValue().isEmpty()) {
815                            for (Path path : entry.getValue().keySet()) {
816                                bf.append(path).append("(").append(entry.getKey()).append(")").append("=>")
817                                        .append(shareLibSymlinkMapping.get(entry.getKey()) != null ? shareLibSymlinkMapping
818                                                .get(entry.getKey()).get(path) : "").append(",");
819                            }
820                        }
821                    return bf.toString();
822                }
823                return "(none)";
824            }
825        });
826
827        instr.addVariable("libs", "sharelib.cached.config.file", new Instrumentation.Variable<String>() {
828            @Override
829            public String getValue() {
830                Map<String, Map<Path, Configuration>> shareLibConfigMap = getShareLibConfigMap();
831                if (shareLibConfigMap != null && !shareLibConfigMap.isEmpty()) {
832                    StringBuffer bf = new StringBuffer();
833
834                    for (String path : shareLibConfigMap.keySet()) {
835                        bf.append(path).append(";");
836                    }
837                    return bf.toString();
838                }
839                return "(none)";
840            }
841        });
842
843    }
844
845    /**
846     * Returns file system for shared libraries.
847     * <p>
848     * If WorkflowAppService#getSystemLibPath doesn't have authority then a default one assumed
849     *
850     * @return file system for shared libraries
851     */
852    public FileSystem getFileSystem() {
853        return fs;
854    }
855
856    /**
857     * Cache XML conf file
858     *
859     * @param propertyFilePath the path of the property file
860     * @param shareLibKey the share lib key
861     * @throws IOException Signals that an I/O exception has occurred.
862     * @throws JDOMException
863     */
864    private void cachePropertyFile(Path qualifiedHdfsPath, Path propertyFilePath, String shareLibKey,
865            Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException, JDOMException {
866        Map<Path, Configuration> confMap = shareLibConfigMap.get(shareLibKey);
867        if (confMap == null) {
868            confMap = new HashMap<Path, Configuration>();
869            shareLibConfigMap.put(shareLibKey, confMap);
870        }
871        FileSystem fileSystem = getHostFileSystem(propertyFilePath);
872        Configuration xmlConf = new XConfiguration(fileSystem.open(propertyFilePath));
873        confMap.put(qualifiedHdfsPath, xmlConf);
874    }
875
876    private void cacheActionKeySharelibConfList() {
877        ActionService actionService = Services.get().get(ActionService.class);
878        Set<String> actionTypes = actionService.getActionTypes();
879        for (String key : actionTypes) {
880            ActionExecutor executor = actionService.getExecutor(key);
881            if (executor instanceof JavaActionExecutor) {
882                JavaActionExecutor jexecutor = (JavaActionExecutor) executor;
883                actionConfSet.addAll(
884                new HashSet<String>(Arrays.asList(jexecutor.getShareLibFilesForActionConf() == null ? new String[0]
885                        : jexecutor.getShareLibFilesForActionConf())));
886            }
887        }
888    }
889
890    public Configuration getShareLibConf(String inputKey, Path path) {
891        if (shareLibConfigMap.containsKey(inputKey)) {
892            return shareLibConfigMap.get(inputKey).get(path);
893        }
894
895        return null;
896    }
897
898    @VisibleForTesting
899    public Map<String, Map<Path, Configuration>> getShareLibConfigMap() {
900        return shareLibConfigMap;
901    }
902
903    private boolean isFilePartOfConfList(Path path) throws URISyntaxException {
904        String fragmentName = new URI(path.toString()).getFragment();
905        String fileName = fragmentName == null ? path.getName() : fragmentName;
906        return actionConfSet.contains(fileName);
907    }
908}