This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import org.apache.commons.logging.LogFactory;
021 import org.apache.hadoop.conf.Configuration;
022 import org.apache.hadoop.util.ReflectionUtils;
023 import org.apache.hadoop.util.VersionInfo;
024 import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
025 import org.apache.oozie.util.XLog;
026 import org.apache.oozie.util.Instrumentable;
027 import org.apache.oozie.util.IOUtils;
028 import org.apache.oozie.ErrorCode;
029
030 import java.util.ArrayList;
031 import java.util.Collections;
032 import java.util.LinkedHashMap;
033 import java.util.List;
034 import java.util.Map;
035 import java.io.IOException;
036 import java.io.File;
037
038 /**
039 * Services is a singleton that manages the lifecycle of all registered {@link Services}. <p/> It has 2 built in
040 * services: {@link XLogService} and {@link ConfigurationService}. <p/> The rest of the services are loaded from the
041 * {@link #CONF_SERVICE_CLASSES} configuration property. The services class names must be separated by commas (spaces
042 * and enters are allowed). <p/> The {@link #CONF_SYSTEM_MODE} configuration property is any of
043 * NORMAL/SAFEMODE/NOWEBSERVICE. <p/> Services are loaded and initialized in the order they are defined in the in
044 * configuration property. <p/> After all services are initialized, if the Instrumentation service is present, all
045 * services that implement the {@link Instrumentable} are instrumented. <p/> Services are destroyed in reverse order.
046 * <p/> If services initialization fail, initialized services are immediatly destroyed.
047 */
048 public class Services {
049 private static final int MAX_SYSTEM_ID_LEN = 10;
050
051 /**
052 * Environment variable that indicates the location of the Oozie home directory.
053 * The Oozie home directory is used to pick up the conf/ directory from
054 */
055 public static final String OOZIE_HOME_DIR = "oozie.home.dir";
056
057 public static final String CONF_SYSTEM_ID = "oozie.system.id";
058
059 public static final String CONF_SERVICE_CLASSES = "oozie.services";
060
061 public static final String CONF_SERVICE_EXT_CLASSES = "oozie.services.ext";
062
063 public static final String CONF_SYSTEM_MODE = "oozie.systemmode";
064
065 public static final String CONF_DELETE_RUNTIME_DIR = "oozie.delete.runtime.dir.on.shutdown";
066
067 private static Services SERVICES;
068
069 private SYSTEM_MODE systemMode;
070 private String runtimeDir;
071 private Configuration conf;
072 private Map<Class<? extends Service>, Service> services = new LinkedHashMap<Class<? extends Service>, Service>();
073 private String systemId;
074 private static String oozieHome;
075
076 public static void setOozieHome() throws ServiceException {
077 oozieHome = System.getProperty(OOZIE_HOME_DIR);
078 if (oozieHome == null) {
079 throw new ServiceException(ErrorCode.E0000);
080 }
081 if (!oozieHome.startsWith("/")) {
082 throw new ServiceException(ErrorCode.E0003, oozieHome);
083 }
084 File file = new File(oozieHome);
085 if (!file.exists()) {
086 throw new ServiceException(ErrorCode.E0004, oozieHome);
087 }
088 }
089
090 public static String getOozieHome() throws ServiceException {
091 return oozieHome;
092 }
093
094 /**
095 * Create a services. <p/> The built in services are initialized.
096 *
097 * @throws ServiceException thrown if any of the built in services could not initialize.
098 */
099 public Services() throws ServiceException {
100 setOozieHome();
101 if (SERVICES != null) {
102 XLog log = XLog.getLog(getClass());
103 log.warn(XLog.OPS, "Previous services singleton active, destroying it");
104 SERVICES.destroy();
105 SERVICES = null;
106 }
107 setServiceInternal(XLogService.class, false);
108 setServiceInternal(ConfigurationService.class, true);
109 conf = get(ConfigurationService.class).getConf();
110 systemId = conf.get(CONF_SYSTEM_ID, ("oozie-" + System.getProperty("user.name")));
111 if (systemId.length() > MAX_SYSTEM_ID_LEN) {
112 systemId = systemId.substring(0, MAX_SYSTEM_ID_LEN);
113 XLog.getLog(getClass()).warn("System ID [{0}] exceeds maximum length [{1}], trimming", systemId,
114 MAX_SYSTEM_ID_LEN);
115 }
116 setSystemMode(SYSTEM_MODE.valueOf(conf.get(CONF_SYSTEM_MODE, SYSTEM_MODE.NORMAL.toString())));
117 runtimeDir = createRuntimeDir();
118 }
119
120 private String createRuntimeDir() throws ServiceException {
121 try {
122 File file = File.createTempFile(getSystemId(), ".dir");
123 file.delete();
124 if (!file.mkdir()) {
125 ServiceException ex = new ServiceException(ErrorCode.E0001, file.getAbsolutePath());
126 XLog.getLog(getClass()).fatal(ex);
127 throw ex;
128 }
129 XLog.getLog(getClass()).info("Initialized runtime directory [{0}]", file.getAbsolutePath());
130 return file.getAbsolutePath();
131 }
132 catch (IOException ex) {
133 ServiceException sex = new ServiceException(ErrorCode.E0001, ex);
134 XLog.getLog(getClass()).fatal(ex);
135 throw sex;
136 }
137 }
138
139 /**
140 * Return active system mode. <p/> .
141 *
142 * @return
143 */
144
145 public SYSTEM_MODE getSystemMode() {
146 return systemMode;
147 }
148
149 /**
150 * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a
151 * new directory per Services initialization.
152 *
153 * @return the runtime directory of the Oozie instance.
154 */
155 public String getRuntimeDir() {
156 return runtimeDir;
157 }
158
159 /**
160 * Return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property.
161 *
162 * @return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property.
163 */
164 public String getSystemId() {
165 return systemId;
166 }
167
168 /**
169 * Set and set system mode.
170 *
171 * @param sysMode system mode
172 */
173
174 public synchronized void setSystemMode(SYSTEM_MODE sysMode) {
175 if (this.systemMode != sysMode) {
176 XLog log = XLog.getLog(getClass());
177 log.info(XLog.OPS, "Exiting " + this.systemMode + " Entering " + sysMode);
178 }
179 this.systemMode = sysMode;
180 }
181
182 /**
183 * Return the services configuration.
184 *
185 * @return services configuraiton.
186 */
187 public Configuration getConf() {
188 return conf;
189 }
190
191 /**
192 * Initialize all services define in the {@link #CONF_SERVICE_CLASSES} configuration property.
193 *
194 * @throws ServiceException thrown if any of the services could not initialize.
195 */
196 @SuppressWarnings("unchecked")
197 public void init() throws ServiceException {
198 XLog log = new XLog(LogFactory.getLog(getClass()));
199 log.trace("Initializing");
200 SERVICES = this;
201 try {
202 loadServices();
203 }
204 catch (RuntimeException ex) {
205 XLog.getLog(getClass()).fatal(ex.getMessage(), ex);
206 throw ex;
207 }
208 catch (ServiceException ex) {
209 SERVICES = null;
210 throw ex;
211 }
212 InstrumentationService instrService = get(InstrumentationService.class);
213 if (instrService != null) {
214 for (Service service : services.values()) {
215 if (service instanceof Instrumentable) {
216 ((Instrumentable) service).instrument(instrService.get());
217 }
218 }
219 }
220 log.info("Initialized");
221 log.info("Running with JARs for Hadoop version [{0}]", VersionInfo.getVersion());
222 log.info("Oozie System ID [{0}] started!", getSystemId());
223 }
224
225 /**
226 * Loads the specified services.
227 *
228 * @param classes services classes to load.
229 * @param list list of loaded service in order of appearance in the
230 * configuration.
231 * @throws ServiceException thrown if a service class could not be loaded.
232 */
233 private void loadServices(Class[] classes, List<Service> list) throws ServiceException {
234 XLog log = new XLog(LogFactory.getLog(getClass()));
235 for (Class klass : classes) {
236 try {
237 Service service = (Service) klass.newInstance();
238 log.debug("Loading service [{0}] implementation [{1}]", service.getInterface(),
239 service.getClass());
240 if (!service.getInterface().isInstance(service)) {
241 throw new ServiceException(ErrorCode.E0101, klass, service.getInterface().getName());
242 }
243 list.add(service);
244 } catch (ServiceException ex) {
245 throw ex;
246 } catch (Exception ex) {
247 throw new ServiceException(ErrorCode.E0102, klass, ex.getMessage(), ex);
248 }
249 }
250 }
251
252 /**
253 * Loads services defined in <code>services</code> and
254 * <code>services.ext</code> and de-dups them.
255 *
256 * @return List of final services to initialize.
257 * @throws ServiceException throw if the services could not be loaded.
258 */
259 private void loadServices() throws ServiceException {
260 XLog log = new XLog(LogFactory.getLog(getClass()));
261 try {
262 Map<Class, Service> map = new LinkedHashMap<Class, Service>();
263 Class[] classes = conf.getClasses(CONF_SERVICE_CLASSES);
264 Class[] classesExt = conf.getClasses(CONF_SERVICE_EXT_CLASSES);
265 List<Service> list = new ArrayList<Service>();
266 loadServices(classes, list);
267 loadServices(classesExt, list);
268
269 //removing duplicate services, strategy: last one wins
270 for (Service service : list) {
271 if (map.containsKey(service.getInterface())) {
272 log.debug("Replacing service [{0}] implementation [{1}]", service.getInterface(),
273 service.getClass());
274 }
275 map.put(service.getInterface(), service);
276 }
277 for (Map.Entry<Class, Service> entry : map.entrySet()) {
278 setService(entry.getValue().getClass());
279 }
280 } catch (RuntimeException ex) {
281 throw new ServiceException(ErrorCode.E0103, ex.getMessage(), ex);
282 }
283 }
284
285 /**
286 * Destroy all services.
287 */
288 public void destroy() {
289 XLog log = new XLog(LogFactory.getLog(getClass()));
290 log.trace("Shutting down");
291 boolean deleteRuntimeDir = false;
292 if (conf != null) {
293 deleteRuntimeDir = conf.getBoolean(CONF_DELETE_RUNTIME_DIR, false);
294 }
295 if (services != null) {
296 List<Service> list = new ArrayList<Service>(services.values());
297 Collections.reverse(list);
298 for (Service service : list) {
299 try {
300 log.trace("Destroying service[{0}]", service.getInterface());
301 if (service.getInterface() == XLogService.class) {
302 log.info("Shutdown");
303 }
304 service.destroy();
305 }
306 catch (Throwable ex) {
307 log.error("Error destroying service[{0}], {1}", service.getInterface(), ex.getMessage(), ex);
308 }
309 }
310 }
311 if (deleteRuntimeDir) {
312 try {
313 IOUtils.delete(new File(runtimeDir));
314 }
315 catch (IOException ex) {
316 log.error("Error deleting runtime directory [{0}], {1}", runtimeDir, ex.getMessage(), ex);
317 }
318 }
319 services = null;
320 conf = null;
321 SERVICES = null;
322 }
323
324 /**
325 * Return a service by its public interface.
326 *
327 * @param serviceKlass service public interface.
328 * @return the associated service, or <code>null</code> if not define.
329 */
330 @SuppressWarnings("unchecked")
331 public <T extends Service> T get(Class<T> serviceKlass) {
332 return (T) services.get(serviceKlass);
333 }
334
335 /**
336 * Set a service programmatically. <p/> The service will be initialized by the services. <p/> If a service is
337 * already defined with the same public interface it will be destroyed.
338 *
339 * @param klass service klass
340 * @throws ServiceException if the service could not be initialized, at this point all services have been
341 * destroyed.
342 */
343 public void setService(Class<? extends Service> klass) throws ServiceException {
344 setServiceInternal(klass, true);
345 }
346
347 private void setServiceInternal(Class<? extends Service> klass, boolean logging) throws ServiceException {
348 try {
349 Service newService = (Service) ReflectionUtils.newInstance(klass, null);
350 Service oldService = services.get(newService.getInterface());
351 if (oldService != null) {
352 oldService.destroy();
353 }
354 if (logging) {
355 XLog log = new XLog(LogFactory.getLog(getClass()));
356 log.trace("Initializing service[{0}] class[{1}]", newService.getInterface(), newService.getClass());
357 }
358 newService.init(this);
359 services.put(newService.getInterface(), newService);
360 }
361 catch (ServiceException ex) {
362 XLog.getLog(getClass()).fatal(ex.getMessage(), ex);
363 destroy();
364 throw ex;
365 }
366 }
367
368 /**
369 * Return the services singleton.
370 *
371 * @return services singleton, <code>null</code> if not initialized.
372 */
373 public static Services get() {
374 return SERVICES;
375 }
376
377 }