001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import org.apache.commons.logging.LogFactory; 021 import org.apache.hadoop.conf.Configuration; 022 import org.apache.hadoop.util.ReflectionUtils; 023 import org.apache.hadoop.util.VersionInfo; 024 import org.apache.oozie.client.OozieClient.SYSTEM_MODE; 025 import org.apache.oozie.util.XLog; 026 import org.apache.oozie.util.Instrumentable; 027 import org.apache.oozie.util.IOUtils; 028 import org.apache.oozie.ErrorCode; 029 030 import java.util.ArrayList; 031 import java.util.Collections; 032 import java.util.LinkedHashMap; 033 import java.util.List; 034 import java.util.Map; 035 import java.io.IOException; 036 import java.io.File; 037 038 /** 039 * Services is a singleton that manages the lifecycle of all registered {@link Services}. <p/> It has 2 built in 040 * services: {@link XLogService} and {@link ConfigurationService}. <p/> The rest of the services are loaded from the 041 * {@link #CONF_SERVICE_CLASSES} configuration property. The services class names must be separated by commas (spaces 042 * and enters are allowed). <p/> The {@link #CONF_SYSTEM_MODE} configuration property is any of 043 * NORMAL/SAFEMODE/NOWEBSERVICE. <p/> Services are loaded and initialized in the order they are defined in the in 044 * configuration property. <p/> After all services are initialized, if the Instrumentation service is present, all 045 * services that implement the {@link Instrumentable} are instrumented. <p/> Services are destroyed in reverse order. 046 * <p/> If services initialization fail, initialized services are immediatly destroyed. 047 */ 048 public class Services { 049 private static final int MAX_SYSTEM_ID_LEN = 10; 050 051 /** 052 * Environment variable that indicates the location of the Oozie home directory. 053 * The Oozie home directory is used to pick up the conf/ directory from 054 */ 055 public static final String OOZIE_HOME_DIR = "oozie.home.dir"; 056 057 public static final String CONF_SYSTEM_ID = "oozie.system.id"; 058 059 public static final String CONF_SERVICE_CLASSES = "oozie.services"; 060 061 public static final String CONF_SERVICE_EXT_CLASSES = "oozie.services.ext"; 062 063 public static final String CONF_SYSTEM_MODE = "oozie.systemmode"; 064 065 public static final String CONF_DELETE_RUNTIME_DIR = "oozie.delete.runtime.dir.on.shutdown"; 066 067 private static Services SERVICES; 068 069 private SYSTEM_MODE systemMode; 070 private String runtimeDir; 071 private Configuration conf; 072 private Map<Class<? extends Service>, Service> services = new LinkedHashMap<Class<? extends Service>, Service>(); 073 private String systemId; 074 private static String oozieHome; 075 076 public static void setOozieHome() throws ServiceException { 077 oozieHome = System.getProperty(OOZIE_HOME_DIR); 078 if (oozieHome == null) { 079 throw new ServiceException(ErrorCode.E0000); 080 } 081 if (!oozieHome.startsWith("/")) { 082 throw new ServiceException(ErrorCode.E0003, oozieHome); 083 } 084 File file = new File(oozieHome); 085 if (!file.exists()) { 086 throw new ServiceException(ErrorCode.E0004, oozieHome); 087 } 088 } 089 090 public static String getOozieHome() throws ServiceException { 091 return oozieHome; 092 } 093 094 /** 095 * Create a services. <p/> The built in services are initialized. 096 * 097 * @throws ServiceException thrown if any of the built in services could not initialize. 098 */ 099 public Services() throws ServiceException { 100 setOozieHome(); 101 if (SERVICES != null) { 102 XLog log = XLog.getLog(getClass()); 103 log.warn(XLog.OPS, "Previous services singleton active, destroying it"); 104 SERVICES.destroy(); 105 SERVICES = null; 106 } 107 setServiceInternal(XLogService.class, false); 108 setServiceInternal(ConfigurationService.class, true); 109 conf = get(ConfigurationService.class).getConf(); 110 systemId = conf.get(CONF_SYSTEM_ID, ("oozie-" + System.getProperty("user.name"))); 111 if (systemId.length() > MAX_SYSTEM_ID_LEN) { 112 systemId = systemId.substring(0, MAX_SYSTEM_ID_LEN); 113 XLog.getLog(getClass()).warn("System ID [{0}] exceeds maximum length [{1}], trimming", systemId, 114 MAX_SYSTEM_ID_LEN); 115 } 116 setSystemMode(SYSTEM_MODE.valueOf(conf.get(CONF_SYSTEM_MODE, SYSTEM_MODE.NORMAL.toString()))); 117 runtimeDir = createRuntimeDir(); 118 } 119 120 private String createRuntimeDir() throws ServiceException { 121 try { 122 File file = File.createTempFile(getSystemId(), ".dir"); 123 file.delete(); 124 if (!file.mkdir()) { 125 ServiceException ex = new ServiceException(ErrorCode.E0001, file.getAbsolutePath()); 126 XLog.getLog(getClass()).fatal(ex); 127 throw ex; 128 } 129 XLog.getLog(getClass()).info("Initialized runtime directory [{0}]", file.getAbsolutePath()); 130 return file.getAbsolutePath(); 131 } 132 catch (IOException ex) { 133 ServiceException sex = new ServiceException(ErrorCode.E0001, ex); 134 XLog.getLog(getClass()).fatal(ex); 135 throw sex; 136 } 137 } 138 139 /** 140 * Return active system mode. <p/> . 141 * 142 * @return 143 */ 144 145 public SYSTEM_MODE getSystemMode() { 146 return systemMode; 147 } 148 149 /** 150 * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a 151 * new directory per Services initialization. 152 * 153 * @return the runtime directory of the Oozie instance. 154 */ 155 public String getRuntimeDir() { 156 return runtimeDir; 157 } 158 159 /** 160 * Return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property. 161 * 162 * @return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property. 163 */ 164 public String getSystemId() { 165 return systemId; 166 } 167 168 /** 169 * Set and set system mode. 170 * 171 * @param sysMode system mode 172 */ 173 174 public synchronized void setSystemMode(SYSTEM_MODE sysMode) { 175 if (this.systemMode != sysMode) { 176 XLog log = XLog.getLog(getClass()); 177 log.info(XLog.OPS, "Exiting " + this.systemMode + " Entering " + sysMode); 178 } 179 this.systemMode = sysMode; 180 } 181 182 /** 183 * Return the services configuration. 184 * 185 * @return services configuraiton. 186 */ 187 public Configuration getConf() { 188 return conf; 189 } 190 191 /** 192 * Initialize all services define in the {@link #CONF_SERVICE_CLASSES} configuration property. 193 * 194 * @throws ServiceException thrown if any of the services could not initialize. 195 */ 196 @SuppressWarnings("unchecked") 197 public void init() throws ServiceException { 198 XLog log = new XLog(LogFactory.getLog(getClass())); 199 log.trace("Initializing"); 200 SERVICES = this; 201 try { 202 loadServices(); 203 } 204 catch (RuntimeException ex) { 205 XLog.getLog(getClass()).fatal(ex.getMessage(), ex); 206 throw ex; 207 } 208 catch (ServiceException ex) { 209 SERVICES = null; 210 throw ex; 211 } 212 InstrumentationService instrService = get(InstrumentationService.class); 213 if (instrService != null) { 214 for (Service service : services.values()) { 215 if (service instanceof Instrumentable) { 216 ((Instrumentable) service).instrument(instrService.get()); 217 } 218 } 219 } 220 log.info("Initialized"); 221 log.info("Running with JARs for Hadoop version [{0}]", VersionInfo.getVersion()); 222 log.info("Oozie System ID [{0}] started!", getSystemId()); 223 } 224 225 /** 226 * Loads the specified services. 227 * 228 * @param classes services classes to load. 229 * @param list list of loaded service in order of appearance in the 230 * configuration. 231 * @throws ServiceException thrown if a service class could not be loaded. 232 */ 233 private void loadServices(Class[] classes, List<Service> list) throws ServiceException { 234 XLog log = new XLog(LogFactory.getLog(getClass())); 235 for (Class klass : classes) { 236 try { 237 Service service = (Service) klass.newInstance(); 238 log.debug("Loading service [{0}] implementation [{1}]", service.getInterface(), 239 service.getClass()); 240 if (!service.getInterface().isInstance(service)) { 241 throw new ServiceException(ErrorCode.E0101, klass, service.getInterface().getName()); 242 } 243 list.add(service); 244 } catch (ServiceException ex) { 245 throw ex; 246 } catch (Exception ex) { 247 throw new ServiceException(ErrorCode.E0102, klass, ex.getMessage(), ex); 248 } 249 } 250 } 251 252 /** 253 * Loads services defined in <code>services</code> and 254 * <code>services.ext</code> and de-dups them. 255 * 256 * @return List of final services to initialize. 257 * @throws ServiceException throw if the services could not be loaded. 258 */ 259 private void loadServices() throws ServiceException { 260 XLog log = new XLog(LogFactory.getLog(getClass())); 261 try { 262 Map<Class, Service> map = new LinkedHashMap<Class, Service>(); 263 Class[] classes = conf.getClasses(CONF_SERVICE_CLASSES); 264 Class[] classesExt = conf.getClasses(CONF_SERVICE_EXT_CLASSES); 265 List<Service> list = new ArrayList<Service>(); 266 loadServices(classes, list); 267 loadServices(classesExt, list); 268 269 //removing duplicate services, strategy: last one wins 270 for (Service service : list) { 271 if (map.containsKey(service.getInterface())) { 272 log.debug("Replacing service [{0}] implementation [{1}]", service.getInterface(), 273 service.getClass()); 274 } 275 map.put(service.getInterface(), service); 276 } 277 for (Map.Entry<Class, Service> entry : map.entrySet()) { 278 setService(entry.getValue().getClass()); 279 } 280 } catch (RuntimeException ex) { 281 throw new ServiceException(ErrorCode.E0103, ex.getMessage(), ex); 282 } 283 } 284 285 /** 286 * Destroy all services. 287 */ 288 public void destroy() { 289 XLog log = new XLog(LogFactory.getLog(getClass())); 290 log.trace("Shutting down"); 291 boolean deleteRuntimeDir = false; 292 if (conf != null) { 293 deleteRuntimeDir = conf.getBoolean(CONF_DELETE_RUNTIME_DIR, false); 294 } 295 if (services != null) { 296 List<Service> list = new ArrayList<Service>(services.values()); 297 Collections.reverse(list); 298 for (Service service : list) { 299 try { 300 log.trace("Destroying service[{0}]", service.getInterface()); 301 if (service.getInterface() == XLogService.class) { 302 log.info("Shutdown"); 303 } 304 service.destroy(); 305 } 306 catch (Throwable ex) { 307 log.error("Error destroying service[{0}], {1}", service.getInterface(), ex.getMessage(), ex); 308 } 309 } 310 } 311 if (deleteRuntimeDir) { 312 try { 313 IOUtils.delete(new File(runtimeDir)); 314 } 315 catch (IOException ex) { 316 log.error("Error deleting runtime directory [{0}], {1}", runtimeDir, ex.getMessage(), ex); 317 } 318 } 319 services = null; 320 conf = null; 321 SERVICES = null; 322 } 323 324 /** 325 * Return a service by its public interface. 326 * 327 * @param serviceKlass service public interface. 328 * @return the associated service, or <code>null</code> if not define. 329 */ 330 @SuppressWarnings("unchecked") 331 public <T extends Service> T get(Class<T> serviceKlass) { 332 return (T) services.get(serviceKlass); 333 } 334 335 /** 336 * Set a service programmatically. <p/> The service will be initialized by the services. <p/> If a service is 337 * already defined with the same public interface it will be destroyed. 338 * 339 * @param klass service klass 340 * @throws ServiceException if the service could not be initialized, at this point all services have been 341 * destroyed. 342 */ 343 public void setService(Class<? extends Service> klass) throws ServiceException { 344 setServiceInternal(klass, true); 345 } 346 347 private void setServiceInternal(Class<? extends Service> klass, boolean logging) throws ServiceException { 348 try { 349 Service newService = (Service) ReflectionUtils.newInstance(klass, null); 350 Service oldService = services.get(newService.getInterface()); 351 if (oldService != null) { 352 oldService.destroy(); 353 } 354 if (logging) { 355 XLog log = new XLog(LogFactory.getLog(getClass())); 356 log.trace("Initializing service[{0}] class[{1}]", newService.getInterface(), newService.getClass()); 357 } 358 newService.init(this); 359 services.put(newService.getInterface(), newService); 360 } 361 catch (ServiceException ex) { 362 XLog.getLog(getClass()).fatal(ex.getMessage(), ex); 363 destroy(); 364 throw ex; 365 } 366 } 367 368 /** 369 * Return the services singleton. 370 * 371 * @return services singleton, <code>null</code> if not initialized. 372 */ 373 public static Services get() { 374 return SERVICES; 375 } 376 377 }