001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import org.apache.commons.logging.LogFactory;
021    import org.apache.hadoop.conf.Configuration;
022    import org.apache.hadoop.util.ReflectionUtils;
023    import org.apache.hadoop.util.VersionInfo;
024    import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
025    import org.apache.oozie.util.XLog;
026    import org.apache.oozie.util.Instrumentable;
027    import org.apache.oozie.util.IOUtils;
028    import org.apache.oozie.ErrorCode;
029    
030    import java.util.ArrayList;
031    import java.util.Collections;
032    import java.util.LinkedHashMap;
033    import java.util.List;
034    import java.util.Map;
035    import java.io.IOException;
036    import java.io.File;
037    
038    /**
039     * Services is a singleton that manages the lifecycle of all registered {@link Services}. <p/> It has 2 built in
040     * services: {@link XLogService} and {@link ConfigurationService}. <p/> The rest of the services are loaded from the
041     * {@link #CONF_SERVICE_CLASSES} configuration property. The services class names must be separated by commas (spaces
042     * and enters are allowed). <p/> The {@link #CONF_SYSTEM_MODE} configuration property is any of
043     * NORMAL/SAFEMODE/NOWEBSERVICE. <p/> Services are loaded and initialized in the order they are defined in the in
044     * configuration property. <p/> After all services are initialized, if the Instrumentation service is present, all
045     * services that implement the {@link Instrumentable} are instrumented. <p/> Services are destroyed in reverse order.
046     * <p/> If services initialization fail, initialized services are immediatly destroyed.
047     */
048    public class Services {
049        private static final int MAX_SYSTEM_ID_LEN = 10;
050    
051        /**
052         * Environment variable that indicates the location of the Oozie home directory.
053         * The Oozie home directory is used to pick up the conf/ directory from
054         */
055        public static final String OOZIE_HOME_DIR = "oozie.home.dir";
056    
057        public static final String CONF_SYSTEM_ID = "oozie.system.id";
058    
059        public static final String CONF_SERVICE_CLASSES = "oozie.services";
060    
061        public static final String CONF_SERVICE_EXT_CLASSES = "oozie.services.ext";
062    
063        public static final String CONF_SYSTEM_MODE = "oozie.systemmode";
064    
065        public static final String CONF_DELETE_RUNTIME_DIR = "oozie.delete.runtime.dir.on.shutdown";
066    
067        private static Services SERVICES;
068    
069        private SYSTEM_MODE systemMode;
070        private String runtimeDir;
071        private Configuration conf;
072        private Map<Class<? extends Service>, Service> services = new LinkedHashMap<Class<? extends Service>, Service>();
073        private String systemId;
074        private static String oozieHome;
075    
076        public static void setOozieHome() throws ServiceException {
077            oozieHome = System.getProperty(OOZIE_HOME_DIR);
078            if (oozieHome == null) {
079                throw new ServiceException(ErrorCode.E0000);
080            }
081            if (!oozieHome.startsWith("/")) {
082                throw new ServiceException(ErrorCode.E0003, oozieHome);
083            }
084            File file = new File(oozieHome);
085            if (!file.exists()) {
086                throw new ServiceException(ErrorCode.E0004, oozieHome);
087            }
088        }
089    
090        public static String getOozieHome() throws ServiceException {
091            return oozieHome;
092        }
093    
094        /**
095         * Create a services. <p/> The built in services are initialized.
096         *
097         * @throws ServiceException thrown if any of the built in services could not initialize.
098         */
099        public Services() throws ServiceException {
100            setOozieHome();
101            if (SERVICES != null) {
102                XLog log = XLog.getLog(getClass());
103                log.warn(XLog.OPS, "Previous services singleton active, destroying it");
104                SERVICES.destroy();
105                SERVICES = null;
106            }
107            setServiceInternal(XLogService.class, false);
108            setServiceInternal(ConfigurationService.class, true);
109            conf = get(ConfigurationService.class).getConf();
110            systemId = conf.get(CONF_SYSTEM_ID, ("oozie-" + System.getProperty("user.name")));
111            if (systemId.length() > MAX_SYSTEM_ID_LEN) {
112                systemId = systemId.substring(0, MAX_SYSTEM_ID_LEN);
113                XLog.getLog(getClass()).warn("System ID [{0}] exceeds maximum length [{1}], trimming", systemId,
114                                             MAX_SYSTEM_ID_LEN);
115            }
116            setSystemMode(SYSTEM_MODE.valueOf(conf.get(CONF_SYSTEM_MODE, SYSTEM_MODE.NORMAL.toString())));
117            runtimeDir = createRuntimeDir();
118        }
119    
120        private String createRuntimeDir() throws ServiceException {
121            try {
122                File file = File.createTempFile(getSystemId(), ".dir");
123                file.delete();
124                if (!file.mkdir()) {
125                    ServiceException ex = new ServiceException(ErrorCode.E0001, file.getAbsolutePath());
126                    XLog.getLog(getClass()).fatal(ex);
127                    throw ex;
128                }
129                XLog.getLog(getClass()).info("Initialized runtime directory [{0}]", file.getAbsolutePath());
130                return file.getAbsolutePath();
131            }
132            catch (IOException ex) {
133                ServiceException sex = new ServiceException(ErrorCode.E0001, ex);
134                XLog.getLog(getClass()).fatal(ex);
135                throw sex;
136            }
137        }
138    
139        /**
140         * Return active system mode. <p/> .
141         *
142         * @return
143         */
144    
145        public SYSTEM_MODE getSystemMode() {
146            return systemMode;
147        }
148    
149        /**
150         * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a
151         * new directory per Services initialization.
152         *
153         * @return the runtime directory of the Oozie instance.
154         */
155        public String getRuntimeDir() {
156            return runtimeDir;
157        }
158    
159        /**
160         * Return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property.
161         *
162         * @return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property.
163         */
164        public String getSystemId() {
165            return systemId;
166        }
167    
168        /**
169         * Set and set system mode.
170         *
171         * @param sysMode system mode
172         */
173    
174        public synchronized void setSystemMode(SYSTEM_MODE sysMode) {
175            if (this.systemMode != sysMode) {
176                XLog log = XLog.getLog(getClass());
177                log.info(XLog.OPS, "Exiting " + this.systemMode + " Entering " + sysMode);
178            }
179            this.systemMode = sysMode;
180        }
181    
182        /**
183         * Return the services configuration.
184         *
185         * @return services configuraiton.
186         */
187        public Configuration getConf() {
188            return conf;
189        }
190    
191        /**
192         * Initialize all services define in the {@link #CONF_SERVICE_CLASSES} configuration property.
193         *
194         * @throws ServiceException thrown if any of the services could not initialize.
195         */
196        @SuppressWarnings("unchecked")
197        public void init() throws ServiceException {
198            XLog log = new XLog(LogFactory.getLog(getClass()));
199            log.trace("Initializing");
200            SERVICES = this;
201            try {
202                Class<? extends Service>[] serviceClasses = (Class<? extends Service>[]) conf.getClasses(
203                        CONF_SERVICE_CLASSES);
204                if (serviceClasses != null) {
205                    for (Class<? extends Service> serviceClass : serviceClasses) {
206                        setService(serviceClass);
207                    }
208                }
209                serviceClasses = (Class<? extends Service>[]) conf.getClasses(CONF_SERVICE_EXT_CLASSES);
210                if (serviceClasses != null) {
211                    for (Class<? extends Service> serviceClass : serviceClasses) {
212                        setService(serviceClass);
213                    }
214                }
215            }
216            catch (RuntimeException ex) {
217                XLog.getLog(getClass()).fatal(ex.getMessage(), ex);
218                throw ex;
219            }
220            catch (ServiceException ex) {
221                SERVICES = null;
222                throw ex;
223            }
224            InstrumentationService instrService = get(InstrumentationService.class);
225            if (instrService != null) {
226                for (Service service : services.values()) {
227                    if (service instanceof Instrumentable) {
228                        ((Instrumentable) service).instrument(instrService.get());
229                    }
230                }
231            }
232            log.info("Initialized");
233            log.info("Running with JARs for Hadoop version [{0}]", VersionInfo.getVersion());
234            log.info("Oozie System ID [{0}] started!", getSystemId());
235        }
236    
237        /**
238         * Destroy all services.
239         */
240        public void destroy() {
241            XLog log = new XLog(LogFactory.getLog(getClass()));
242            log.trace("Shutting down");
243            boolean deleteRuntimeDir = false;
244            if (conf != null) {
245                deleteRuntimeDir = conf.getBoolean(CONF_DELETE_RUNTIME_DIR, false);
246            }
247            if (services != null) {
248                List<Service> list = new ArrayList<Service>(services.values());
249                Collections.reverse(list);
250                for (Service service : list) {
251                    try {
252                        log.trace("Destroying service[{0}]", service.getInterface());
253                        if (service.getInterface() == XLogService.class) {
254                            log.info("Shutdown");
255                        }
256                        service.destroy();
257                    }
258                    catch (Throwable ex) {
259                        log.error("Error destroying service[{0}], {1}", service.getInterface(), ex.getMessage(), ex);
260                    }
261                }
262            }
263            if (deleteRuntimeDir) {
264                try {
265                    IOUtils.delete(new File(runtimeDir));
266                }
267                catch (IOException ex) {
268                    log.error("Error deleting runtime directory [{0}], {1}", runtimeDir, ex.getMessage(), ex);
269                }
270            }
271            services = null;
272            conf = null;
273            SERVICES = null;
274        }
275    
276        /**
277         * Return a service by its public interface.
278         *
279         * @param serviceKlass service public interface.
280         * @return the associated service, or <code>null</code> if not define.
281         */
282        @SuppressWarnings("unchecked")
283        public <T extends Service> T get(Class<T> serviceKlass) {
284            return (T) services.get(serviceKlass);
285        }
286    
287        /**
288         * Set a service programmatically. <p/> The service will be initialized by the services. <p/> If a service is
289         * already defined with the same public interface it will be destroyed.
290         *
291         * @param klass service klass
292         * @throws ServiceException if the service could not be initialized, at this point all services have been
293         * destroyed.
294         */
295        public void setService(Class<? extends Service> klass) throws ServiceException {
296            setServiceInternal(klass, true);
297        }
298    
299        private void setServiceInternal(Class<? extends Service> klass, boolean logging) throws ServiceException {
300            try {
301                Service newService = (Service) ReflectionUtils.newInstance(klass, null);
302                Service oldService = services.get(newService.getInterface());
303                if (oldService != null) {
304                    oldService.destroy();
305                }
306                if (logging) {
307                    XLog log = new XLog(LogFactory.getLog(getClass()));
308                    log.trace("Initializing service[{0}] class[{1}]", newService.getInterface(), newService.getClass());
309                }
310                newService.init(this);
311                services.put(newService.getInterface(), newService);
312            }
313            catch (ServiceException ex) {
314                XLog.getLog(getClass()).fatal(ex.getMessage(), ex);
315                destroy();
316                throw ex;
317            }
318        }
319    
320        /**
321         * Return the services singleton.
322         *
323         * @return services singleton, <code>null</code> if not initialized.
324         */
325        public static Services get() {
326            return SERVICES;
327        }
328    
329    }