001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import org.apache.commons.logging.LogFactory;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.util.ReflectionUtils;
024import org.apache.hadoop.util.VersionInfo;
025import org.apache.oozie.BuildInfo;
026import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
027import org.apache.oozie.util.DateUtils;
028import org.apache.oozie.util.XLog;
029import org.apache.oozie.util.Instrumentable;
030import org.apache.oozie.util.Instrumentation;
031import org.apache.oozie.util.IOUtils;
032import org.apache.oozie.ErrorCode;
033
034import java.util.ArrayList;
035import java.util.Collections;
036import java.util.LinkedHashMap;
037import java.util.List;
038import java.util.Map;
039import java.io.IOException;
040import java.io.File;
041
042/**
043 * Services is a singleton that manages the lifecycle of all registered {@link Services}. <p> It has 2 built in
044 * services: {@link XLogService} and {@link ConfigurationService}. <p> The rest of the services are loaded from the
045 * {@link #CONF_SERVICE_CLASSES} configuration property. The services class names must be separated by commas (spaces
046 * and enters are allowed). <p> The {@link #CONF_SYSTEM_MODE} configuration property is any of
047 * NORMAL/SAFEMODE/NOWEBSERVICE. <p> Services are loaded and initialized in the order they are defined in the in
048 * configuration property. <p> After all services are initialized, if the Instrumentation service is present, all
049 * services that implement the {@link Instrumentable} are instrumented. <p> Services are destroyed in reverse order.
050 * <p> If services initialization fail, initialized services are immediatly destroyed.
051 */
052public class Services {
053    private static final int MAX_SYSTEM_ID_LEN = 10;
054
055    /**
056     * Environment variable that indicates the location of the Oozie home directory.
057     * The Oozie home directory is used to pick up the conf/ directory from
058     */
059    public static final String OOZIE_HOME_DIR = "oozie.home.dir";
060
061    public static final String CONF_SYSTEM_ID = "oozie.system.id";
062
063    public static final String CONF_SERVICE_CLASSES = "oozie.services";
064
065    public static final String CONF_SERVICE_EXT_CLASSES = "oozie.services.ext";
066
067    public static final String CONF_SYSTEM_MODE = "oozie.systemmode";
068
069    public static final String CONF_DELETE_RUNTIME_DIR = "oozie.delete.runtime.dir.on.shutdown";
070
071    private static Services SERVICES;
072
073    private SYSTEM_MODE systemMode;
074    private String runtimeDir;
075    private Configuration conf;
076    private Map<Class<? extends Service>, Service> services = new LinkedHashMap<Class<? extends Service>, Service>();
077    private String systemId;
078    private static String oozieHome;
079
080    public static void setOozieHome() throws ServiceException {
081        oozieHome = System.getProperty(OOZIE_HOME_DIR);
082        if (oozieHome == null) {
083            throw new ServiceException(ErrorCode.E0000);
084        }
085        File file = new File(oozieHome);
086        if (!file.isAbsolute()) {
087            throw new ServiceException(ErrorCode.E0003, oozieHome);
088        }
089        if (!file.exists()) {
090            throw new ServiceException(ErrorCode.E0004, oozieHome);
091        }
092    }
093
094    public static String getOozieHome() throws ServiceException {
095        return oozieHome;
096    }
097
098    /**
099     * Create a services. <p> The built in services are initialized.
100     *
101     * @throws ServiceException thrown if any of the built in services could not initialize.
102     */
103    public Services() throws ServiceException {
104        setOozieHome();
105        if (SERVICES != null) {
106            XLog log = XLog.getLog(getClass());
107            log.warn(XLog.OPS, "Previous services singleton active, destroying it");
108            SERVICES.destroy();
109            SERVICES = null;
110        }
111        setServiceInternal(XLogService.class, false);
112        setServiceInternal(ConfigurationService.class, true);
113        conf = get(ConfigurationService.class).getConf();
114        DateUtils.setConf(conf);
115        if (!DateUtils.getOozieProcessingTimeZone().equals(DateUtils.UTC)) {
116            XLog.getLog(getClass()).warn("Oozie configured to work in a timezone other than UTC: {0}",
117                                         DateUtils.getOozieProcessingTimeZone().getID());
118        }
119        systemId = ConfigurationService.get(conf, CONF_SYSTEM_ID);
120        if (systemId.length() > MAX_SYSTEM_ID_LEN) {
121            systemId = systemId.substring(0, MAX_SYSTEM_ID_LEN);
122            XLog.getLog(getClass()).warn("System ID [{0}] exceeds maximum length [{1}], trimming", systemId,
123                                         MAX_SYSTEM_ID_LEN);
124        }
125        setSystemMode(SYSTEM_MODE.valueOf(ConfigurationService.get(conf, CONF_SYSTEM_MODE)));
126        runtimeDir = createRuntimeDir();
127    }
128
129    private String createRuntimeDir() throws ServiceException {
130        try {
131            File file = File.createTempFile(getSystemId(), ".dir");
132            file.delete();
133            if (!file.mkdir()) {
134                ServiceException ex = new ServiceException(ErrorCode.E0001, file.getAbsolutePath());
135                XLog.getLog(getClass()).fatal(ex);
136                throw ex;
137            }
138            XLog.getLog(getClass()).info("Initialized runtime directory [{0}]", file.getAbsolutePath());
139            return file.getAbsolutePath();
140        }
141        catch (IOException ex) {
142            ServiceException sex = new ServiceException(ErrorCode.E0001, "", ex);
143            XLog.getLog(getClass()).fatal(ex);
144            throw sex;
145        }
146    }
147
148    /**
149     * Return active system mode. <p> .
150     *
151     * @return systemMode returns active system mode
152     */
153
154    public SYSTEM_MODE getSystemMode() {
155        return systemMode;
156    }
157
158    /**
159     * Return the runtime directory of the Oozie instance. <p> The directory is created under TMP and it is always a
160     * new directory per Services initialization.
161     *
162     * @return the runtime directory of the Oozie instance.
163     */
164    public String getRuntimeDir() {
165        return runtimeDir;
166    }
167
168    /**
169     * Return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property.
170     *
171     * @return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property.
172     */
173    public String getSystemId() {
174        return systemId;
175    }
176
177    /**
178     * Set and set system mode.
179     *
180     * @param sysMode system mode
181     */
182
183    public synchronized void setSystemMode(SYSTEM_MODE sysMode) {
184        if (this.systemMode != sysMode) {
185            XLog log = XLog.getLog(getClass());
186            log.info(XLog.OPS, "Exiting " + this.systemMode + " Entering " + sysMode);
187        }
188        this.systemMode = sysMode;
189    }
190
191    /**
192     * Return the services configuration.
193     *
194     * @return services configuration.
195     * @deprecated Use {@link ConfigurationService#get(String)} to retrieve property from oozie configurations.
196     */
197    @Deprecated
198    public Configuration getConf() {
199        return conf;
200    }
201
202    /**
203     * Initialize all services define in the {@link #CONF_SERVICE_CLASSES} configuration property.
204     *
205     * @throws ServiceException thrown if any of the services could not initialize.
206     */
207    public void init() throws ServiceException {
208        XLog log = new XLog(LogFactory.getLog(getClass()));
209        log.trace("Initializing");
210        SERVICES = this;
211        try {
212            loadServices();
213        }
214        catch (RuntimeException rex) {
215            XLog.getLog(getClass()).fatal(rex.getMessage(), rex);
216            throw rex;
217        }
218        catch (ServiceException ex) {
219            XLog.getLog(getClass()).fatal(ex.getMessage(), ex);
220            SERVICES = null;
221            throw ex;
222        }
223        InstrumentationService instrService = get(InstrumentationService.class);
224        if (instrService != null) {
225            Instrumentation instr = instrService.get();
226            for (Service service : services.values()) {
227                if (service instanceof Instrumentable) {
228                    ((Instrumentable) service).instrument(instr);
229                }
230            }
231            instr.addVariable("oozie", "version", new Instrumentation.Variable<String>() {
232                @Override
233                public String getValue() {
234                    return BuildInfo.getBuildInfo().getProperty(BuildInfo.BUILD_VERSION);
235                }
236            });
237            instr.addVariable("oozie", "mode", new Instrumentation.Variable<String>() {
238                @Override
239                public String getValue() {
240                    return getSystemMode().toString();
241                }
242            });
243        }
244        log.info("Initialized");
245        log.info("Running with JARs for Hadoop version [{0}]", VersionInfo.getVersion());
246        log.info("Oozie System ID [{0}] started!", getSystemId());
247    }
248
249    /**
250     * Loads the specified services.
251     *
252     * @param classes services classes to load.
253     * @param list    list of loaded service in order of appearance in the
254     *                configuration.
255     * @throws ServiceException thrown if a service class could not be loaded.
256     */
257    private void loadServices(Class<?>[] classes, List<Service> list) throws ServiceException {
258        XLog log = new XLog(LogFactory.getLog(getClass()));
259        for (Class<?> klass : classes) {
260            try {
261                Service service = (Service) klass.newInstance();
262                log.debug("Loading service [{0}] implementation [{1}]", service.getInterface(),
263                        service.getClass());
264                if (!service.getInterface().isInstance(service)) {
265                    throw new ServiceException(ErrorCode.E0101, klass, service.getInterface().getName());
266                }
267                list.add(service);
268            } catch (ServiceException ex) {
269                throw ex;
270            } catch (Exception ex) {
271                throw new ServiceException(ErrorCode.E0102, klass, ex.getMessage(), ex);
272            }
273        }
274    }
275
276    /**
277     * Loads services defined in <code>services</code> and
278     * <code>services.ext</code> and de-dups them.
279     *
280     * @return List of final services to initialize.
281     * @throws ServiceException throw if the services could not be loaded.
282     */
283    private void loadServices() throws ServiceException {
284        XLog log = new XLog(LogFactory.getLog(getClass()));
285        try {
286            Map<Class<?>, Service> map = new LinkedHashMap<Class<?>, Service>();
287            Class<?>[] classes = ConfigurationService.getClasses(conf, CONF_SERVICE_CLASSES);
288            log.debug("Services list obtained from property '" + CONF_SERVICE_CLASSES + "'");
289            Class<?>[] classesExt = ConfigurationService.getClasses(conf, CONF_SERVICE_EXT_CLASSES);
290            log.debug("Services list obtained from property '" + CONF_SERVICE_EXT_CLASSES + "'");
291            List<Service> list = new ArrayList<Service>();
292            loadServices(classes, list);
293            loadServices(classesExt, list);
294
295            //removing duplicate services, strategy: last one wins
296            for (Service service : list) {
297                if (map.containsKey(service.getInterface())) {
298                    log.debug("Replacing service [{0}] implementation [{1}]", service.getInterface(),
299                            service.getClass());
300                }
301                map.put(service.getInterface(), service);
302            }
303            for (Map.Entry<Class<?>, Service> entry : map.entrySet()) {
304                setService(entry.getValue().getClass());
305            }
306        } catch (RuntimeException rex) {
307            log.fatal("Runtime Exception during Services Load. Check your list of '{0}' or '{1}'",
308                    CONF_SERVICE_CLASSES, CONF_SERVICE_EXT_CLASSES, rex);
309            throw new ServiceException(ErrorCode.E0103, rex.getMessage(), rex);
310        }
311    }
312
313    /**
314     * Destroy all services.
315     */
316    public void destroy() {
317        XLog log = new XLog(LogFactory.getLog(getClass()));
318        log.trace("Shutting down");
319        boolean deleteRuntimeDir = false;
320        if (conf != null) {
321            deleteRuntimeDir = conf.getBoolean(CONF_DELETE_RUNTIME_DIR, false);
322        }
323        if (services != null) {
324            List<Service> list = new ArrayList<Service>(services.values());
325            Collections.reverse(list);
326            for (Service service : list) {
327                try {
328                    log.trace("Destroying service[{0}]", service.getInterface());
329                    if (service.getInterface() == XLogService.class) {
330                        log.info("Shutdown");
331                    }
332                    service.destroy();
333                }
334                catch (Throwable ex) {
335                    log.error("Error destroying service[{0}], {1}", service.getInterface(), ex.getMessage(), ex);
336                }
337            }
338        }
339        if (deleteRuntimeDir) {
340            try {
341                IOUtils.delete(new File(runtimeDir));
342            }
343            catch (IOException ex) {
344                log.error("Error deleting runtime directory [{0}], {1}", runtimeDir, ex.getMessage(), ex);
345            }
346        }
347        services = null;
348        conf = null;
349        SERVICES = null;
350    }
351
352    /**
353     * Return a service by its public interface.
354     *
355     * @param serviceKlass service public interface.
356     * @return the associated service, or <code>null</code> if not define.
357     */
358    @SuppressWarnings("unchecked")
359    public <T extends Service> T get(Class<T> serviceKlass) {
360        return (T) services.get(serviceKlass);
361    }
362
363    /**
364     * Set a service programmatically. <p> The service will be initialized by the services. <p> If a service is
365     * already defined with the same public interface it will be destroyed.
366     *
367     * @param klass service klass
368     * @throws ServiceException if the service could not be initialized, at this point all services have been
369     * destroyed.
370     */
371    public void setService(Class<? extends Service> klass) throws ServiceException {
372        setServiceInternal(klass, true);
373    }
374
375    private void setServiceInternal(Class<? extends Service> klass, boolean logging) throws ServiceException {
376        try {
377            Service newService = (Service) ReflectionUtils.newInstance(klass, null);
378            Service oldService = services.get(newService.getInterface());
379            if (oldService != null) {
380                oldService.destroy();
381            }
382            if (logging) {
383                XLog log = new XLog(LogFactory.getLog(getClass()));
384                log.trace("Initializing service[{0}] class[{1}]", newService.getInterface(), newService.getClass());
385            }
386            newService.init(this);
387            services.put(newService.getInterface(), newService);
388        }
389        catch (ServiceException ex) {
390            XLog.getLog(getClass()).fatal(ex.getMessage(), ex);
391            destroy();
392            throw ex;
393        }
394    }
395
396    /**
397     * Return the services singleton.
398     *
399     * @return services singleton, <code>null</code> if not initialized.
400     */
401    public static Services get() {
402        return SERVICES;
403    }
404
405}