001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import org.apache.commons.logging.LogFactory;
021    import org.apache.hadoop.conf.Configuration;
022    import org.apache.hadoop.util.ReflectionUtils;
023    import org.apache.hadoop.util.VersionInfo;
024    import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
025    import org.apache.oozie.util.XLog;
026    import org.apache.oozie.util.Instrumentable;
027    import org.apache.oozie.util.IOUtils;
028    import org.apache.oozie.ErrorCode;
029    
030    import java.util.ArrayList;
031    import java.util.Collections;
032    import java.util.LinkedHashMap;
033    import java.util.List;
034    import java.util.Map;
035    import java.io.IOException;
036    import java.io.File;
037    
038    /**
039     * Services is a singleton that manages the lifecycle of all registered {@link Services}. <p/> It has 2 built in
040     * services: {@link XLogService} and {@link ConfigurationService}. <p/> The rest of the services are loaded from the
041     * {@link #CONF_SERVICE_CLASSES} configuration property. The services class names must be separated by commas (spaces
042     * and enters are allowed). <p/> The {@link #CONF_SYSTEM_MODE} configuration property is any of
043     * NORMAL/SAFEMODE/NOWEBSERVICE. <p/> Services are loaded and initialized in the order they are defined in the in
044     * configuration property. <p/> After all services are initialized, if the Instrumentation service is present, all
045     * services that implement the {@link Instrumentable} are instrumented. <p/> Services are destroyed in reverse order.
046     * <p/> If services initialization fail, initialized services are immediatly destroyed.
047     */
048    public class Services {
049        private static final int MAX_SYSTEM_ID_LEN = 10;
050    
051        /**
052         * Environment variable that indicates the location of the Oozie home directory.
053         * The Oozie home directory is used to pick up the conf/ directory from
054         */
055        public static final String OOZIE_HOME_DIR = "oozie.home.dir";
056    
057        public static final String CONF_SYSTEM_ID = "oozie.system.id";
058    
059        public static final String CONF_SERVICE_CLASSES = "oozie.services";
060    
061        public static final String CONF_SERVICE_EXT_CLASSES = "oozie.services.ext";
062    
063        public static final String CONF_SYSTEM_MODE = "oozie.systemmode";
064    
065        public static final String CONF_DELETE_RUNTIME_DIR = "oozie.delete.runtime.dir.on.shutdown";
066    
067        private static Services SERVICES;
068    
069        private SYSTEM_MODE systemMode;
070        private String runtimeDir;
071        private Configuration conf;
072        private Map<Class<? extends Service>, Service> services = new LinkedHashMap<Class<? extends Service>, Service>();
073        private String systemId;
074        private static String oozieHome;
075    
076        public static void setOozieHome() throws ServiceException {
077            oozieHome = System.getProperty(OOZIE_HOME_DIR);
078            if (oozieHome == null) {
079                throw new ServiceException(ErrorCode.E0000);
080            }
081            if (!oozieHome.startsWith("/")) {
082                throw new ServiceException(ErrorCode.E0003, oozieHome);
083            }
084            File file = new File(oozieHome);
085            if (!file.exists()) {
086                throw new ServiceException(ErrorCode.E0004, oozieHome);
087            }
088        }
089    
090        public static String getOozieHome() throws ServiceException {
091            return oozieHome;
092        }
093    
094        /**
095         * Create a services. <p/> The built in services are initialized.
096         *
097         * @throws ServiceException thrown if any of the built in services could not initialize.
098         */
099        public Services() throws ServiceException {
100            setOozieHome();
101            if (SERVICES != null) {
102                XLog log = XLog.getLog(getClass());
103                log.warn(XLog.OPS, "Previous services singleton active, destroying it");
104                SERVICES.destroy();
105                SERVICES = null;
106            }
107            setServiceInternal(XLogService.class, false);
108            setServiceInternal(ConfigurationService.class, true);
109            conf = get(ConfigurationService.class).getConf();
110            systemId = conf.get(CONF_SYSTEM_ID, ("oozie-" + System.getProperty("user.name")));
111            if (systemId.length() > MAX_SYSTEM_ID_LEN) {
112                systemId = systemId.substring(0, MAX_SYSTEM_ID_LEN);
113                XLog.getLog(getClass()).warn("System ID [{0}] exceeds maximum length [{1}], trimming", systemId,
114                                             MAX_SYSTEM_ID_LEN);
115            }
116            setSystemMode(SYSTEM_MODE.valueOf(conf.get(CONF_SYSTEM_MODE, SYSTEM_MODE.NORMAL.toString())));
117            runtimeDir = createRuntimeDir();
118        }
119    
120        private String createRuntimeDir() throws ServiceException {
121            try {
122                File file = File.createTempFile(getSystemId(), ".dir");
123                file.delete();
124                if (!file.mkdir()) {
125                    ServiceException ex = new ServiceException(ErrorCode.E0001, file.getAbsolutePath());
126                    XLog.getLog(getClass()).fatal(ex);
127                    throw ex;
128                }
129                XLog.getLog(getClass()).info("Initialized runtime directory [{0}]", file.getAbsolutePath());
130                return file.getAbsolutePath();
131            }
132            catch (IOException ex) {
133                ServiceException sex = new ServiceException(ErrorCode.E0001, ex);
134                XLog.getLog(getClass()).fatal(ex);
135                throw sex;
136            }
137        }
138    
139        /**
140         * Return active system mode. <p/> .
141         *
142         * @return
143         */
144    
145        public SYSTEM_MODE getSystemMode() {
146            return systemMode;
147        }
148    
149        /**
150         * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a
151         * new directory per Services initialization.
152         *
153         * @return the runtime directory of the Oozie instance.
154         */
155        public String getRuntimeDir() {
156            return runtimeDir;
157        }
158    
159        /**
160         * Return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property.
161         *
162         * @return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property.
163         */
164        public String getSystemId() {
165            return systemId;
166        }
167    
168        /**
169         * Set and set system mode.
170         *
171         * @param sysMode system mode
172         */
173    
174        public synchronized void setSystemMode(SYSTEM_MODE sysMode) {
175            if (this.systemMode != sysMode) {
176                XLog log = XLog.getLog(getClass());
177                log.info(XLog.OPS, "Exiting " + this.systemMode + " Entering " + sysMode);
178            }
179            this.systemMode = sysMode;
180        }
181    
182        /**
183         * Return the services configuration.
184         *
185         * @return services configuraiton.
186         */
187        public Configuration getConf() {
188            return conf;
189        }
190    
191        /**
192         * Initialize all services define in the {@link #CONF_SERVICE_CLASSES} configuration property.
193         *
194         * @throws ServiceException thrown if any of the services could not initialize.
195         */
196        @SuppressWarnings("unchecked")
197        public void init() throws ServiceException {
198            XLog log = new XLog(LogFactory.getLog(getClass()));
199            log.trace("Initializing");
200            SERVICES = this;
201            try {
202                loadServices();
203            }
204            catch (RuntimeException ex) {
205                XLog.getLog(getClass()).fatal(ex.getMessage(), ex);
206                throw ex;
207            }
208            catch (ServiceException ex) {
209                SERVICES = null;
210                throw ex;
211            }
212            InstrumentationService instrService = get(InstrumentationService.class);
213            if (instrService != null) {
214                for (Service service : services.values()) {
215                    if (service instanceof Instrumentable) {
216                        ((Instrumentable) service).instrument(instrService.get());
217                    }
218                }
219            }
220            log.info("Initialized");
221            log.info("Running with JARs for Hadoop version [{0}]", VersionInfo.getVersion());
222            log.info("Oozie System ID [{0}] started!", getSystemId());
223        }
224    
225        /**
226         * Loads the specified services.
227         *
228         * @param classes services classes to load.
229         * @param list    list of loaded service in order of appearance in the
230         *                configuration.
231         * @throws ServiceException thrown if a service class could not be loaded.
232         */
233        private void loadServices(Class[] classes, List<Service> list) throws ServiceException {
234            XLog log = new XLog(LogFactory.getLog(getClass()));
235            for (Class klass : classes) {
236                try {
237                    Service service = (Service) klass.newInstance();
238                    log.debug("Loading service [{0}] implementation [{1}]", service.getInterface(),
239                            service.getClass());
240                    if (!service.getInterface().isInstance(service)) {
241                        throw new ServiceException(ErrorCode.E0101, klass, service.getInterface().getName());
242                    }
243                    list.add(service);
244                } catch (ServiceException ex) {
245                    throw ex;
246                } catch (Exception ex) {
247                    throw new ServiceException(ErrorCode.E0102, klass, ex.getMessage(), ex);
248                }
249            }
250        }
251    
252        /**
253         * Loads services defined in <code>services</code> and
254         * <code>services.ext</code> and de-dups them.
255         *
256         * @return List of final services to initialize.
257         * @throws ServiceException throw if the services could not be loaded.
258         */
259        private void loadServices() throws ServiceException {
260            XLog log = new XLog(LogFactory.getLog(getClass()));
261            try {
262                Map<Class, Service> map = new LinkedHashMap<Class, Service>();
263                Class[] classes = conf.getClasses(CONF_SERVICE_CLASSES);
264                Class[] classesExt = conf.getClasses(CONF_SERVICE_EXT_CLASSES);
265                List<Service> list = new ArrayList<Service>();
266                loadServices(classes, list);
267                loadServices(classesExt, list);
268    
269                //removing duplicate services, strategy: last one wins
270                for (Service service : list) {
271                    if (map.containsKey(service.getInterface())) {
272                        log.debug("Replacing service [{0}] implementation [{1}]", service.getInterface(),
273                                service.getClass());
274                    }
275                    map.put(service.getInterface(), service);
276                }
277                for (Map.Entry<Class, Service> entry : map.entrySet()) {
278                    setService(entry.getValue().getClass());
279                }
280            } catch (RuntimeException ex) {
281                throw new ServiceException(ErrorCode.E0103, ex.getMessage(), ex);
282            }
283        }
284    
285        /**
286         * Destroy all services.
287         */
288        public void destroy() {
289            XLog log = new XLog(LogFactory.getLog(getClass()));
290            log.trace("Shutting down");
291            boolean deleteRuntimeDir = false;
292            if (conf != null) {
293                deleteRuntimeDir = conf.getBoolean(CONF_DELETE_RUNTIME_DIR, false);
294            }
295            if (services != null) {
296                List<Service> list = new ArrayList<Service>(services.values());
297                Collections.reverse(list);
298                for (Service service : list) {
299                    try {
300                        log.trace("Destroying service[{0}]", service.getInterface());
301                        if (service.getInterface() == XLogService.class) {
302                            log.info("Shutdown");
303                        }
304                        service.destroy();
305                    }
306                    catch (Throwable ex) {
307                        log.error("Error destroying service[{0}], {1}", service.getInterface(), ex.getMessage(), ex);
308                    }
309                }
310            }
311            if (deleteRuntimeDir) {
312                try {
313                    IOUtils.delete(new File(runtimeDir));
314                }
315                catch (IOException ex) {
316                    log.error("Error deleting runtime directory [{0}], {1}", runtimeDir, ex.getMessage(), ex);
317                }
318            }
319            services = null;
320            conf = null;
321            SERVICES = null;
322        }
323    
324        /**
325         * Return a service by its public interface.
326         *
327         * @param serviceKlass service public interface.
328         * @return the associated service, or <code>null</code> if not define.
329         */
330        @SuppressWarnings("unchecked")
331        public <T extends Service> T get(Class<T> serviceKlass) {
332            return (T) services.get(serviceKlass);
333        }
334    
335        /**
336         * Set a service programmatically. <p/> The service will be initialized by the services. <p/> If a service is
337         * already defined with the same public interface it will be destroyed.
338         *
339         * @param klass service klass
340         * @throws ServiceException if the service could not be initialized, at this point all services have been
341         * destroyed.
342         */
343        public void setService(Class<? extends Service> klass) throws ServiceException {
344            setServiceInternal(klass, true);
345        }
346    
347        private void setServiceInternal(Class<? extends Service> klass, boolean logging) throws ServiceException {
348            try {
349                Service newService = (Service) ReflectionUtils.newInstance(klass, null);
350                Service oldService = services.get(newService.getInterface());
351                if (oldService != null) {
352                    oldService.destroy();
353                }
354                if (logging) {
355                    XLog log = new XLog(LogFactory.getLog(getClass()));
356                    log.trace("Initializing service[{0}] class[{1}]", newService.getInterface(), newService.getClass());
357                }
358                newService.init(this);
359                services.put(newService.getInterface(), newService);
360            }
361            catch (ServiceException ex) {
362                XLog.getLog(getClass()).fatal(ex.getMessage(), ex);
363                destroy();
364                throw ex;
365            }
366        }
367    
368        /**
369         * Return the services singleton.
370         *
371         * @return services singleton, <code>null</code> if not initialized.
372         */
373        public static Services get() {
374            return SERVICES;
375        }
376    
377    }