001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import org.apache.commons.logging.LogFactory;
021    import org.apache.hadoop.conf.Configuration;
022    import org.apache.hadoop.util.ReflectionUtils;
023    import org.apache.hadoop.util.VersionInfo;
024    import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
025    import org.apache.oozie.util.DateUtils;
026    import org.apache.oozie.util.XLog;
027    import org.apache.oozie.util.Instrumentable;
028    import org.apache.oozie.util.IOUtils;
029    import org.apache.oozie.ErrorCode;
030    
031    import java.util.ArrayList;
032    import java.util.Collections;
033    import java.util.LinkedHashMap;
034    import java.util.List;
035    import java.util.Map;
036    import java.io.IOException;
037    import java.io.File;
038    
039    /**
040     * Services is a singleton that manages the lifecycle of all registered {@link Services}. <p/> It has 2 built in
041     * services: {@link XLogService} and {@link ConfigurationService}. <p/> The rest of the services are loaded from the
042     * {@link #CONF_SERVICE_CLASSES} configuration property. The services class names must be separated by commas (spaces
043     * and enters are allowed). <p/> The {@link #CONF_SYSTEM_MODE} configuration property is any of
044     * NORMAL/SAFEMODE/NOWEBSERVICE. <p/> Services are loaded and initialized in the order they are defined in the in
045     * configuration property. <p/> After all services are initialized, if the Instrumentation service is present, all
046     * services that implement the {@link Instrumentable} are instrumented. <p/> Services are destroyed in reverse order.
047     * <p/> If services initialization fail, initialized services are immediatly destroyed.
048     */
049    public class Services {
050        private static final int MAX_SYSTEM_ID_LEN = 10;
051    
052        /**
053         * Environment variable that indicates the location of the Oozie home directory.
054         * The Oozie home directory is used to pick up the conf/ directory from
055         */
056        public static final String OOZIE_HOME_DIR = "oozie.home.dir";
057    
058        public static final String CONF_SYSTEM_ID = "oozie.system.id";
059    
060        public static final String CONF_SERVICE_CLASSES = "oozie.services";
061    
062        public static final String CONF_SERVICE_EXT_CLASSES = "oozie.services.ext";
063    
064        public static final String CONF_SYSTEM_MODE = "oozie.systemmode";
065    
066        public static final String CONF_DELETE_RUNTIME_DIR = "oozie.delete.runtime.dir.on.shutdown";
067    
068        private static Services SERVICES;
069    
070        private SYSTEM_MODE systemMode;
071        private String runtimeDir;
072        private Configuration conf;
073        private Map<Class<? extends Service>, Service> services = new LinkedHashMap<Class<? extends Service>, Service>();
074        private String systemId;
075        private static String oozieHome;
076    
077        public static void setOozieHome() throws ServiceException {
078            oozieHome = System.getProperty(OOZIE_HOME_DIR);
079            if (oozieHome == null) {
080                throw new ServiceException(ErrorCode.E0000);
081            }
082            if (!oozieHome.startsWith("/")) {
083                throw new ServiceException(ErrorCode.E0003, oozieHome);
084            }
085            File file = new File(oozieHome);
086            if (!file.exists()) {
087                throw new ServiceException(ErrorCode.E0004, oozieHome);
088            }
089        }
090    
091        public static String getOozieHome() throws ServiceException {
092            return oozieHome;
093        }
094    
095        /**
096         * Create a services. <p/> The built in services are initialized.
097         *
098         * @throws ServiceException thrown if any of the built in services could not initialize.
099         */
100        public Services() throws ServiceException {
101            setOozieHome();
102            if (SERVICES != null) {
103                XLog log = XLog.getLog(getClass());
104                log.warn(XLog.OPS, "Previous services singleton active, destroying it");
105                SERVICES.destroy();
106                SERVICES = null;
107            }
108            setServiceInternal(XLogService.class, false);
109            setServiceInternal(ConfigurationService.class, true);
110            conf = get(ConfigurationService.class).getConf();
111            DateUtils.setConf(conf);
112            if (!DateUtils.getOozieProcessingTimeZone().equals(DateUtils.UTC)) {
113                XLog.getLog(getClass()).warn("Oozie configured to work in a timezone other than UTC: {0}",
114                                             DateUtils.getOozieProcessingTimeZone().getID());
115            }
116            systemId = conf.get(CONF_SYSTEM_ID, ("oozie-" + System.getProperty("user.name")));
117            if (systemId.length() > MAX_SYSTEM_ID_LEN) {
118                systemId = systemId.substring(0, MAX_SYSTEM_ID_LEN);
119                XLog.getLog(getClass()).warn("System ID [{0}] exceeds maximum length [{1}], trimming", systemId,
120                                             MAX_SYSTEM_ID_LEN);
121            }
122            setSystemMode(SYSTEM_MODE.valueOf(conf.get(CONF_SYSTEM_MODE, SYSTEM_MODE.NORMAL.toString())));
123            runtimeDir = createRuntimeDir();
124        }
125    
126        private String createRuntimeDir() throws ServiceException {
127            try {
128                File file = File.createTempFile(getSystemId(), ".dir");
129                file.delete();
130                if (!file.mkdir()) {
131                    ServiceException ex = new ServiceException(ErrorCode.E0001, file.getAbsolutePath());
132                    XLog.getLog(getClass()).fatal(ex);
133                    throw ex;
134                }
135                XLog.getLog(getClass()).info("Initialized runtime directory [{0}]", file.getAbsolutePath());
136                return file.getAbsolutePath();
137            }
138            catch (IOException ex) {
139                ServiceException sex = new ServiceException(ErrorCode.E0001, "", ex);
140                XLog.getLog(getClass()).fatal(ex);
141                throw sex;
142            }
143        }
144    
145        /**
146         * Return active system mode. <p/> .
147         *
148         * @return
149         */
150    
151        public SYSTEM_MODE getSystemMode() {
152            return systemMode;
153        }
154    
155        /**
156         * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a
157         * new directory per Services initialization.
158         *
159         * @return the runtime directory of the Oozie instance.
160         */
161        public String getRuntimeDir() {
162            return runtimeDir;
163        }
164    
165        /**
166         * Return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property.
167         *
168         * @return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property.
169         */
170        public String getSystemId() {
171            return systemId;
172        }
173    
174        /**
175         * Set and set system mode.
176         *
177         * @param sysMode system mode
178         */
179    
180        public synchronized void setSystemMode(SYSTEM_MODE sysMode) {
181            if (this.systemMode != sysMode) {
182                XLog log = XLog.getLog(getClass());
183                log.info(XLog.OPS, "Exiting " + this.systemMode + " Entering " + sysMode);
184            }
185            this.systemMode = sysMode;
186        }
187    
188        /**
189         * Return the services configuration.
190         *
191         * @return services configuraiton.
192         */
193        public Configuration getConf() {
194            return conf;
195        }
196    
197        /**
198         * Initialize all services define in the {@link #CONF_SERVICE_CLASSES} configuration property.
199         *
200         * @throws ServiceException thrown if any of the services could not initialize.
201         */
202        @SuppressWarnings("unchecked")
203        public void init() throws ServiceException {
204            XLog log = new XLog(LogFactory.getLog(getClass()));
205            log.trace("Initializing");
206            SERVICES = this;
207            try {
208                loadServices();
209            }
210            catch (RuntimeException rex) {
211                XLog.getLog(getClass()).fatal(rex.getMessage(), rex);
212                throw rex;
213            }
214            catch (ServiceException ex) {
215                XLog.getLog(getClass()).fatal(ex.getMessage(), ex);
216                SERVICES = null;
217                throw ex;
218            }
219            InstrumentationService instrService = get(InstrumentationService.class);
220            if (instrService != null) {
221                for (Service service : services.values()) {
222                    if (service instanceof Instrumentable) {
223                        ((Instrumentable) service).instrument(instrService.get());
224                    }
225                }
226            }
227            log.info("Initialized");
228            log.info("Running with JARs for Hadoop version [{0}]", VersionInfo.getVersion());
229            log.info("Oozie System ID [{0}] started!", getSystemId());
230        }
231    
232        /**
233         * Loads the specified services.
234         *
235         * @param classes services classes to load.
236         * @param list    list of loaded service in order of appearance in the
237         *                configuration.
238         * @throws ServiceException thrown if a service class could not be loaded.
239         */
240        private void loadServices(Class[] classes, List<Service> list) throws ServiceException {
241            XLog log = new XLog(LogFactory.getLog(getClass()));
242            for (Class klass : classes) {
243                try {
244                    Service service = (Service) klass.newInstance();
245                    log.debug("Loading service [{0}] implementation [{1}]", service.getInterface(),
246                            service.getClass());
247                    if (!service.getInterface().isInstance(service)) {
248                        throw new ServiceException(ErrorCode.E0101, klass, service.getInterface().getName());
249                    }
250                    list.add(service);
251                } catch (ServiceException ex) {
252                    throw ex;
253                } catch (Exception ex) {
254                    throw new ServiceException(ErrorCode.E0102, klass, ex.getMessage(), ex);
255                }
256            }
257        }
258    
259        /**
260         * Loads services defined in <code>services</code> and
261         * <code>services.ext</code> and de-dups them.
262         *
263         * @return List of final services to initialize.
264         * @throws ServiceException throw if the services could not be loaded.
265         */
266        private void loadServices() throws ServiceException {
267            XLog log = new XLog(LogFactory.getLog(getClass()));
268            try {
269                Map<Class, Service> map = new LinkedHashMap<Class, Service>();
270                Class[] classes = conf.getClasses(CONF_SERVICE_CLASSES);
271                log.debug("Services list obtained from property '" + CONF_SERVICE_CLASSES + "'");
272                Class[] classesExt = conf.getClasses(CONF_SERVICE_EXT_CLASSES);
273                log.debug("Services list obtained from property '" + CONF_SERVICE_EXT_CLASSES + "'");
274                List<Service> list = new ArrayList<Service>();
275                loadServices(classes, list);
276                loadServices(classesExt, list);
277    
278                //removing duplicate services, strategy: last one wins
279                for (Service service : list) {
280                    if (map.containsKey(service.getInterface())) {
281                        log.debug("Replacing service [{0}] implementation [{1}]", service.getInterface(),
282                                service.getClass());
283                    }
284                    map.put(service.getInterface(), service);
285                }
286                for (Map.Entry<Class, Service> entry : map.entrySet()) {
287                    setService(entry.getValue().getClass());
288                }
289            } catch (RuntimeException rex) {
290                log.fatal("Runtime Exception during Services Load. Check your list of '" + CONF_SERVICE_CLASSES + "' or '" + CONF_SERVICE_EXT_CLASSES + "'");
291                throw new ServiceException(ErrorCode.E0103, rex.getMessage(), rex);
292            }
293        }
294    
295        /**
296         * Destroy all services.
297         */
298        public void destroy() {
299            XLog log = new XLog(LogFactory.getLog(getClass()));
300            log.trace("Shutting down");
301            boolean deleteRuntimeDir = false;
302            if (conf != null) {
303                deleteRuntimeDir = conf.getBoolean(CONF_DELETE_RUNTIME_DIR, false);
304            }
305            if (services != null) {
306                List<Service> list = new ArrayList<Service>(services.values());
307                Collections.reverse(list);
308                for (Service service : list) {
309                    try {
310                        log.trace("Destroying service[{0}]", service.getInterface());
311                        if (service.getInterface() == XLogService.class) {
312                            log.info("Shutdown");
313                        }
314                        service.destroy();
315                    }
316                    catch (Throwable ex) {
317                        log.error("Error destroying service[{0}], {1}", service.getInterface(), ex.getMessage(), ex);
318                    }
319                }
320            }
321            if (deleteRuntimeDir) {
322                try {
323                    IOUtils.delete(new File(runtimeDir));
324                }
325                catch (IOException ex) {
326                    log.error("Error deleting runtime directory [{0}], {1}", runtimeDir, ex.getMessage(), ex);
327                }
328            }
329            services = null;
330            conf = null;
331            SERVICES = null;
332        }
333    
334        /**
335         * Return a service by its public interface.
336         *
337         * @param serviceKlass service public interface.
338         * @return the associated service, or <code>null</code> if not define.
339         */
340        @SuppressWarnings("unchecked")
341        public <T extends Service> T get(Class<T> serviceKlass) {
342            return (T) services.get(serviceKlass);
343        }
344    
345        /**
346         * Set a service programmatically. <p/> The service will be initialized by the services. <p/> If a service is
347         * already defined with the same public interface it will be destroyed.
348         *
349         * @param klass service klass
350         * @throws ServiceException if the service could not be initialized, at this point all services have been
351         * destroyed.
352         */
353        public void setService(Class<? extends Service> klass) throws ServiceException {
354            setServiceInternal(klass, true);
355        }
356    
357        private void setServiceInternal(Class<? extends Service> klass, boolean logging) throws ServiceException {
358            try {
359                Service newService = (Service) ReflectionUtils.newInstance(klass, null);
360                Service oldService = services.get(newService.getInterface());
361                if (oldService != null) {
362                    oldService.destroy();
363                }
364                if (logging) {
365                    XLog log = new XLog(LogFactory.getLog(getClass()));
366                    log.trace("Initializing service[{0}] class[{1}]", newService.getInterface(), newService.getClass());
367                }
368                newService.init(this);
369                services.put(newService.getInterface(), newService);
370            }
371            catch (ServiceException ex) {
372                XLog.getLog(getClass()).fatal(ex.getMessage(), ex);
373                destroy();
374                throw ex;
375            }
376        }
377    
378        /**
379         * Return the services singleton.
380         *
381         * @return services singleton, <code>null</code> if not initialized.
382         */
383        public static Services get() {
384            return SERVICES;
385        }
386    
387    }