001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import org.apache.commons.logging.LogFactory;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.util.ReflectionUtils;
023import org.apache.hadoop.util.VersionInfo;
024import org.apache.oozie.BuildInfo;
025import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
026import org.apache.oozie.util.DateUtils;
027import org.apache.oozie.util.XLog;
028import org.apache.oozie.util.Instrumentable;
029import org.apache.oozie.util.Instrumentation;
030import org.apache.oozie.util.IOUtils;
031import org.apache.oozie.ErrorCode;
032
033import java.util.ArrayList;
034import java.util.Collections;
035import java.util.LinkedHashMap;
036import java.util.List;
037import java.util.Map;
038import java.io.IOException;
039import java.io.File;
040
041/**
042 * Services is a singleton that manages the lifecycle of all registered {@link Services}. <p/> It has 2 built in
043 * services: {@link XLogService} and {@link ConfigurationService}. <p/> The rest of the services are loaded from the
044 * {@link #CONF_SERVICE_CLASSES} configuration property. The services class names must be separated by commas (spaces
045 * and enters are allowed). <p/> The {@link #CONF_SYSTEM_MODE} configuration property is any of
046 * NORMAL/SAFEMODE/NOWEBSERVICE. <p/> Services are loaded and initialized in the order they are defined in the in
047 * configuration property. <p/> After all services are initialized, if the Instrumentation service is present, all
048 * services that implement the {@link Instrumentable} are instrumented. <p/> Services are destroyed in reverse order.
049 * <p/> If services initialization fail, initialized services are immediatly destroyed.
050 */
051public class Services {
052    private static final int MAX_SYSTEM_ID_LEN = 10;
053
054    /**
055     * Environment variable that indicates the location of the Oozie home directory.
056     * The Oozie home directory is used to pick up the conf/ directory from
057     */
058    public static final String OOZIE_HOME_DIR = "oozie.home.dir";
059
060    public static final String CONF_SYSTEM_ID = "oozie.system.id";
061
062    public static final String CONF_SERVICE_CLASSES = "oozie.services";
063
064    public static final String CONF_SERVICE_EXT_CLASSES = "oozie.services.ext";
065
066    public static final String CONF_SYSTEM_MODE = "oozie.systemmode";
067
068    public static final String CONF_DELETE_RUNTIME_DIR = "oozie.delete.runtime.dir.on.shutdown";
069
070    private static Services SERVICES;
071
072    private SYSTEM_MODE systemMode;
073    private String runtimeDir;
074    private Configuration conf;
075    private Map<Class<? extends Service>, Service> services = new LinkedHashMap<Class<? extends Service>, Service>();
076    private String systemId;
077    private static String oozieHome;
078
079    public static void setOozieHome() throws ServiceException {
080        oozieHome = System.getProperty(OOZIE_HOME_DIR);
081        if (oozieHome == null) {
082            throw new ServiceException(ErrorCode.E0000);
083        }
084        File file = new File(oozieHome);
085        if (!file.isAbsolute()) {
086            throw new ServiceException(ErrorCode.E0003, oozieHome);
087        }
088        if (!file.exists()) {
089            throw new ServiceException(ErrorCode.E0004, oozieHome);
090        }
091    }
092
093    public static String getOozieHome() throws ServiceException {
094        return oozieHome;
095    }
096
097    /**
098     * Create a services. <p/> The built in services are initialized.
099     *
100     * @throws ServiceException thrown if any of the built in services could not initialize.
101     */
102    public Services() throws ServiceException {
103        setOozieHome();
104        if (SERVICES != null) {
105            XLog log = XLog.getLog(getClass());
106            log.warn(XLog.OPS, "Previous services singleton active, destroying it");
107            SERVICES.destroy();
108            SERVICES = null;
109        }
110        setServiceInternal(XLogService.class, false);
111        setServiceInternal(ConfigurationService.class, true);
112        conf = get(ConfigurationService.class).getConf();
113        DateUtils.setConf(conf);
114        if (!DateUtils.getOozieProcessingTimeZone().equals(DateUtils.UTC)) {
115            XLog.getLog(getClass()).warn("Oozie configured to work in a timezone other than UTC: {0}",
116                                         DateUtils.getOozieProcessingTimeZone().getID());
117        }
118        systemId = conf.get(CONF_SYSTEM_ID, ("oozie-" + System.getProperty("user.name")));
119        if (systemId.length() > MAX_SYSTEM_ID_LEN) {
120            systemId = systemId.substring(0, MAX_SYSTEM_ID_LEN);
121            XLog.getLog(getClass()).warn("System ID [{0}] exceeds maximum length [{1}], trimming", systemId,
122                                         MAX_SYSTEM_ID_LEN);
123        }
124        setSystemMode(SYSTEM_MODE.valueOf(conf.get(CONF_SYSTEM_MODE, SYSTEM_MODE.NORMAL.toString())));
125        runtimeDir = createRuntimeDir();
126    }
127
128    private String createRuntimeDir() throws ServiceException {
129        try {
130            File file = File.createTempFile(getSystemId(), ".dir");
131            file.delete();
132            if (!file.mkdir()) {
133                ServiceException ex = new ServiceException(ErrorCode.E0001, file.getAbsolutePath());
134                XLog.getLog(getClass()).fatal(ex);
135                throw ex;
136            }
137            XLog.getLog(getClass()).info("Initialized runtime directory [{0}]", file.getAbsolutePath());
138            return file.getAbsolutePath();
139        }
140        catch (IOException ex) {
141            ServiceException sex = new ServiceException(ErrorCode.E0001, "", ex);
142            XLog.getLog(getClass()).fatal(ex);
143            throw sex;
144        }
145    }
146
147    /**
148     * Return active system mode. <p/> .
149     *
150     * @return
151     */
152
153    public SYSTEM_MODE getSystemMode() {
154        return systemMode;
155    }
156
157    /**
158     * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a
159     * new directory per Services initialization.
160     *
161     * @return the runtime directory of the Oozie instance.
162     */
163    public String getRuntimeDir() {
164        return runtimeDir;
165    }
166
167    /**
168     * Return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property.
169     *
170     * @return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property.
171     */
172    public String getSystemId() {
173        return systemId;
174    }
175
176    /**
177     * Set and set system mode.
178     *
179     * @param sysMode system mode
180     */
181
182    public synchronized void setSystemMode(SYSTEM_MODE sysMode) {
183        if (this.systemMode != sysMode) {
184            XLog log = XLog.getLog(getClass());
185            log.info(XLog.OPS, "Exiting " + this.systemMode + " Entering " + sysMode);
186        }
187        this.systemMode = sysMode;
188    }
189
190    /**
191     * Return the services configuration.
192     *
193     * @return services configuraiton.
194     */
195    public Configuration getConf() {
196        return conf;
197    }
198
199    /**
200     * Initialize all services define in the {@link #CONF_SERVICE_CLASSES} configuration property.
201     *
202     * @throws ServiceException thrown if any of the services could not initialize.
203     */
204    @SuppressWarnings("unchecked")
205    public void init() throws ServiceException {
206        XLog log = new XLog(LogFactory.getLog(getClass()));
207        log.trace("Initializing");
208        SERVICES = this;
209        try {
210            loadServices();
211        }
212        catch (RuntimeException rex) {
213            XLog.getLog(getClass()).fatal(rex.getMessage(), rex);
214            throw rex;
215        }
216        catch (ServiceException ex) {
217            XLog.getLog(getClass()).fatal(ex.getMessage(), ex);
218            SERVICES = null;
219            throw ex;
220        }
221        InstrumentationService instrService = get(InstrumentationService.class);
222        if (instrService != null) {
223            Instrumentation instr = instrService.get();
224            for (Service service : services.values()) {
225                if (service instanceof Instrumentable) {
226                    ((Instrumentable) service).instrument(instr);
227                }
228            }
229            instr.addVariable("oozie", "version", new Instrumentation.Variable<String>() {
230                @Override
231                public String getValue() {
232                    return BuildInfo.getBuildInfo().getProperty(BuildInfo.BUILD_VERSION);
233                }
234            });
235            instr.addVariable("oozie", "mode", new Instrumentation.Variable<String>() {
236                @Override
237                public String getValue() {
238                    return getSystemMode().toString();
239                }
240            });
241        }
242        log.info("Initialized");
243        log.info("Running with JARs for Hadoop version [{0}]", VersionInfo.getVersion());
244        log.info("Oozie System ID [{0}] started!", getSystemId());
245    }
246
247    /**
248     * Loads the specified services.
249     *
250     * @param classes services classes to load.
251     * @param list    list of loaded service in order of appearance in the
252     *                configuration.
253     * @throws ServiceException thrown if a service class could not be loaded.
254     */
255    private void loadServices(Class[] classes, List<Service> list) throws ServiceException {
256        XLog log = new XLog(LogFactory.getLog(getClass()));
257        for (Class klass : classes) {
258            try {
259                Service service = (Service) klass.newInstance();
260                log.debug("Loading service [{0}] implementation [{1}]", service.getInterface(),
261                        service.getClass());
262                if (!service.getInterface().isInstance(service)) {
263                    throw new ServiceException(ErrorCode.E0101, klass, service.getInterface().getName());
264                }
265                list.add(service);
266            } catch (ServiceException ex) {
267                throw ex;
268            } catch (Exception ex) {
269                throw new ServiceException(ErrorCode.E0102, klass, ex.getMessage(), ex);
270            }
271        }
272    }
273
274    /**
275     * Loads services defined in <code>services</code> and
276     * <code>services.ext</code> and de-dups them.
277     *
278     * @return List of final services to initialize.
279     * @throws ServiceException throw if the services could not be loaded.
280     */
281    private void loadServices() throws ServiceException {
282        XLog log = new XLog(LogFactory.getLog(getClass()));
283        try {
284            Map<Class, Service> map = new LinkedHashMap<Class, Service>();
285            Class[] classes = conf.getClasses(CONF_SERVICE_CLASSES);
286            log.debug("Services list obtained from property '" + CONF_SERVICE_CLASSES + "'");
287            Class[] classesExt = conf.getClasses(CONF_SERVICE_EXT_CLASSES);
288            log.debug("Services list obtained from property '" + CONF_SERVICE_EXT_CLASSES + "'");
289            List<Service> list = new ArrayList<Service>();
290            loadServices(classes, list);
291            loadServices(classesExt, list);
292
293            //removing duplicate services, strategy: last one wins
294            for (Service service : list) {
295                if (map.containsKey(service.getInterface())) {
296                    log.debug("Replacing service [{0}] implementation [{1}]", service.getInterface(),
297                            service.getClass());
298                }
299                map.put(service.getInterface(), service);
300            }
301            for (Map.Entry<Class, Service> entry : map.entrySet()) {
302                setService(entry.getValue().getClass());
303            }
304        } catch (RuntimeException rex) {
305            log.fatal("Runtime Exception during Services Load. Check your list of '" + CONF_SERVICE_CLASSES + "' or '" + CONF_SERVICE_EXT_CLASSES + "'");
306            throw new ServiceException(ErrorCode.E0103, rex.getMessage(), rex);
307        }
308    }
309
310    /**
311     * Destroy all services.
312     */
313    public void destroy() {
314        XLog log = new XLog(LogFactory.getLog(getClass()));
315        log.trace("Shutting down");
316        boolean deleteRuntimeDir = false;
317        if (conf != null) {
318            deleteRuntimeDir = conf.getBoolean(CONF_DELETE_RUNTIME_DIR, false);
319        }
320        if (services != null) {
321            List<Service> list = new ArrayList<Service>(services.values());
322            Collections.reverse(list);
323            for (Service service : list) {
324                try {
325                    log.trace("Destroying service[{0}]", service.getInterface());
326                    if (service.getInterface() == XLogService.class) {
327                        log.info("Shutdown");
328                    }
329                    service.destroy();
330                }
331                catch (Throwable ex) {
332                    log.error("Error destroying service[{0}], {1}", service.getInterface(), ex.getMessage(), ex);
333                }
334            }
335        }
336        if (deleteRuntimeDir) {
337            try {
338                IOUtils.delete(new File(runtimeDir));
339            }
340            catch (IOException ex) {
341                log.error("Error deleting runtime directory [{0}], {1}", runtimeDir, ex.getMessage(), ex);
342            }
343        }
344        services = null;
345        conf = null;
346        SERVICES = null;
347    }
348
349    /**
350     * Return a service by its public interface.
351     *
352     * @param serviceKlass service public interface.
353     * @return the associated service, or <code>null</code> if not define.
354     */
355    @SuppressWarnings("unchecked")
356    public <T extends Service> T get(Class<T> serviceKlass) {
357        return (T) services.get(serviceKlass);
358    }
359
360    /**
361     * Set a service programmatically. <p/> The service will be initialized by the services. <p/> If a service is
362     * already defined with the same public interface it will be destroyed.
363     *
364     * @param klass service klass
365     * @throws ServiceException if the service could not be initialized, at this point all services have been
366     * destroyed.
367     */
368    public void setService(Class<? extends Service> klass) throws ServiceException {
369        setServiceInternal(klass, true);
370    }
371
372    private void setServiceInternal(Class<? extends Service> klass, boolean logging) throws ServiceException {
373        try {
374            Service newService = (Service) ReflectionUtils.newInstance(klass, null);
375            Service oldService = services.get(newService.getInterface());
376            if (oldService != null) {
377                oldService.destroy();
378            }
379            if (logging) {
380                XLog log = new XLog(LogFactory.getLog(getClass()));
381                log.trace("Initializing service[{0}] class[{1}]", newService.getInterface(), newService.getClass());
382            }
383            newService.init(this);
384            services.put(newService.getInterface(), newService);
385        }
386        catch (ServiceException ex) {
387            XLog.getLog(getClass()).fatal(ex.getMessage(), ex);
388            destroy();
389            throw ex;
390        }
391    }
392
393    /**
394     * Return the services singleton.
395     *
396     * @return services singleton, <code>null</code> if not initialized.
397     */
398    public static Services get() {
399        return SERVICES;
400    }
401
402}