001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import org.apache.commons.logging.LogFactory; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.util.ReflectionUtils; 024import org.apache.hadoop.util.VersionInfo; 025import org.apache.oozie.BuildInfo; 026import org.apache.oozie.client.OozieClient.SYSTEM_MODE; 027import org.apache.oozie.util.DateUtils; 028import org.apache.oozie.util.XLog; 029import org.apache.oozie.util.Instrumentable; 030import org.apache.oozie.util.Instrumentation; 031import org.apache.oozie.util.IOUtils; 032import org.apache.oozie.ErrorCode; 033 034import java.util.ArrayList; 035import java.util.Collections; 036import java.util.LinkedHashMap; 037import java.util.List; 038import java.util.Map; 039import java.io.IOException; 040import java.io.File; 041 042/** 043 * Services is a singleton that manages the lifecycle of all registered {@link Services}. <p> It has 2 built in 044 * services: {@link XLogService} and {@link ConfigurationService}. <p> The rest of the services are loaded from the 045 * {@link #CONF_SERVICE_CLASSES} configuration property. The services class names must be separated by commas (spaces 046 * and enters are allowed). <p> The {@link #CONF_SYSTEM_MODE} configuration property is any of 047 * NORMAL/SAFEMODE/NOWEBSERVICE. <p> Services are loaded and initialized in the order they are defined in the in 048 * configuration property. <p> After all services are initialized, if the Instrumentation service is present, all 049 * services that implement the {@link Instrumentable} are instrumented. <p> Services are destroyed in reverse order. 050 * <p> If services initialization fail, initialized services are immediatly destroyed. 051 */ 052public class Services { 053 private static final int MAX_SYSTEM_ID_LEN = 10; 054 055 /** 056 * Environment variable that indicates the location of the Oozie home directory. 057 * The Oozie home directory is used to pick up the conf/ directory from 058 */ 059 public static final String OOZIE_HOME_DIR = "oozie.home.dir"; 060 061 public static final String CONF_SYSTEM_ID = "oozie.system.id"; 062 063 public static final String CONF_SERVICE_CLASSES = "oozie.services"; 064 065 public static final String CONF_SERVICE_EXT_CLASSES = "oozie.services.ext"; 066 067 public static final String CONF_SYSTEM_MODE = "oozie.systemmode"; 068 069 public static final String CONF_DELETE_RUNTIME_DIR = "oozie.delete.runtime.dir.on.shutdown"; 070 071 private static Services SERVICES; 072 073 private SYSTEM_MODE systemMode; 074 private String runtimeDir; 075 private Configuration conf; 076 private Map<Class<? extends Service>, Service> services = new LinkedHashMap<Class<? extends Service>, Service>(); 077 private String systemId; 078 private static String oozieHome; 079 080 public static void setOozieHome() throws ServiceException { 081 oozieHome = System.getProperty(OOZIE_HOME_DIR); 082 if (oozieHome == null) { 083 throw new ServiceException(ErrorCode.E0000); 084 } 085 File file = new File(oozieHome); 086 if (!file.isAbsolute()) { 087 throw new ServiceException(ErrorCode.E0003, oozieHome); 088 } 089 if (!file.exists()) { 090 throw new ServiceException(ErrorCode.E0004, oozieHome); 091 } 092 } 093 094 public static String getOozieHome() throws ServiceException { 095 return oozieHome; 096 } 097 098 /** 099 * Create a services. <p> The built in services are initialized. 100 * 101 * @throws ServiceException thrown if any of the built in services could not initialize. 102 */ 103 public Services() throws ServiceException { 104 setOozieHome(); 105 if (SERVICES != null) { 106 XLog log = XLog.getLog(getClass()); 107 log.warn(XLog.OPS, "Previous services singleton active, destroying it"); 108 SERVICES.destroy(); 109 SERVICES = null; 110 } 111 setServiceInternal(XLogService.class, false); 112 setServiceInternal(ConfigurationService.class, true); 113 conf = get(ConfigurationService.class).getConf(); 114 DateUtils.setConf(conf); 115 if (!DateUtils.getOozieProcessingTimeZone().equals(DateUtils.UTC)) { 116 XLog.getLog(getClass()).warn("Oozie configured to work in a timezone other than UTC: {0}", 117 DateUtils.getOozieProcessingTimeZone().getID()); 118 } 119 systemId = ConfigurationService.get(conf, CONF_SYSTEM_ID); 120 if (systemId.length() > MAX_SYSTEM_ID_LEN) { 121 systemId = systemId.substring(0, MAX_SYSTEM_ID_LEN); 122 XLog.getLog(getClass()).warn("System ID [{0}] exceeds maximum length [{1}], trimming", systemId, 123 MAX_SYSTEM_ID_LEN); 124 } 125 setSystemMode(SYSTEM_MODE.valueOf(ConfigurationService.get(conf, CONF_SYSTEM_MODE))); 126 runtimeDir = createRuntimeDir(); 127 } 128 129 private String createRuntimeDir() throws ServiceException { 130 try { 131 File file = File.createTempFile(getSystemId(), ".dir"); 132 file.delete(); 133 if (!file.mkdir()) { 134 ServiceException ex = new ServiceException(ErrorCode.E0001, file.getAbsolutePath()); 135 XLog.getLog(getClass()).fatal(ex); 136 throw ex; 137 } 138 XLog.getLog(getClass()).info("Initialized runtime directory [{0}]", file.getAbsolutePath()); 139 return file.getAbsolutePath(); 140 } 141 catch (IOException ex) { 142 ServiceException sex = new ServiceException(ErrorCode.E0001, "", ex); 143 XLog.getLog(getClass()).fatal(ex); 144 throw sex; 145 } 146 } 147 148 /** 149 * Return active system mode. <p> . 150 * 151 * @return systemMode returns active system mode 152 */ 153 154 public SYSTEM_MODE getSystemMode() { 155 return systemMode; 156 } 157 158 /** 159 * Return the runtime directory of the Oozie instance. <p> The directory is created under TMP and it is always a 160 * new directory per Services initialization. 161 * 162 * @return the runtime directory of the Oozie instance. 163 */ 164 public String getRuntimeDir() { 165 return runtimeDir; 166 } 167 168 /** 169 * Return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property. 170 * 171 * @return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property. 172 */ 173 public String getSystemId() { 174 return systemId; 175 } 176 177 /** 178 * Set and set system mode. 179 * 180 * @param sysMode system mode 181 */ 182 183 public synchronized void setSystemMode(SYSTEM_MODE sysMode) { 184 if (this.systemMode != sysMode) { 185 XLog log = XLog.getLog(getClass()); 186 log.info(XLog.OPS, "Exiting " + this.systemMode + " Entering " + sysMode); 187 } 188 this.systemMode = sysMode; 189 } 190 191 /** 192 * Return the services configuration. 193 * 194 * @return services configuration. 195 * @deprecated Use {@link ConfigurationService#get(String)} to retrieve property from oozie configurations. 196 */ 197 @Deprecated 198 public Configuration getConf() { 199 return conf; 200 } 201 202 /** 203 * Initialize all services define in the {@link #CONF_SERVICE_CLASSES} configuration property. 204 * 205 * @throws ServiceException thrown if any of the services could not initialize. 206 */ 207 public void init() throws ServiceException { 208 XLog log = new XLog(LogFactory.getLog(getClass())); 209 log.trace("Initializing"); 210 SERVICES = this; 211 try { 212 loadServices(); 213 } 214 catch (RuntimeException rex) { 215 XLog.getLog(getClass()).fatal(rex.getMessage(), rex); 216 throw rex; 217 } 218 catch (ServiceException ex) { 219 XLog.getLog(getClass()).fatal(ex.getMessage(), ex); 220 SERVICES = null; 221 throw ex; 222 } 223 InstrumentationService instrService = get(InstrumentationService.class); 224 if (instrService != null) { 225 Instrumentation instr = instrService.get(); 226 for (Service service : services.values()) { 227 if (service instanceof Instrumentable) { 228 ((Instrumentable) service).instrument(instr); 229 } 230 } 231 instr.addVariable("oozie", "version", new Instrumentation.Variable<String>() { 232 @Override 233 public String getValue() { 234 return BuildInfo.getBuildInfo().getProperty(BuildInfo.BUILD_VERSION); 235 } 236 }); 237 instr.addVariable("oozie", "mode", new Instrumentation.Variable<String>() { 238 @Override 239 public String getValue() { 240 return getSystemMode().toString(); 241 } 242 }); 243 } 244 log.info("Initialized"); 245 log.info("Running with JARs for Hadoop version [{0}]", VersionInfo.getVersion()); 246 log.info("Oozie System ID [{0}] started!", getSystemId()); 247 } 248 249 /** 250 * Loads the specified services. 251 * 252 * @param classes services classes to load. 253 * @param list list of loaded service in order of appearance in the 254 * configuration. 255 * @throws ServiceException thrown if a service class could not be loaded. 256 */ 257 private void loadServices(Class<?>[] classes, List<Service> list) throws ServiceException { 258 XLog log = new XLog(LogFactory.getLog(getClass())); 259 for (Class<?> klass : classes) { 260 try { 261 Service service = (Service) klass.newInstance(); 262 log.debug("Loading service [{0}] implementation [{1}]", service.getInterface(), 263 service.getClass()); 264 if (!service.getInterface().isInstance(service)) { 265 throw new ServiceException(ErrorCode.E0101, klass, service.getInterface().getName()); 266 } 267 list.add(service); 268 } catch (ServiceException ex) { 269 throw ex; 270 } catch (Exception ex) { 271 throw new ServiceException(ErrorCode.E0102, klass, ex.getMessage(), ex); 272 } 273 } 274 } 275 276 /** 277 * Loads services defined in <code>services</code> and 278 * <code>services.ext</code> and de-dups them. 279 * 280 * @return List of final services to initialize. 281 * @throws ServiceException throw if the services could not be loaded. 282 */ 283 private void loadServices() throws ServiceException { 284 XLog log = new XLog(LogFactory.getLog(getClass())); 285 try { 286 Map<Class<?>, Service> map = new LinkedHashMap<Class<?>, Service>(); 287 Class<?>[] classes = ConfigurationService.getClasses(conf, CONF_SERVICE_CLASSES); 288 log.debug("Services list obtained from property '" + CONF_SERVICE_CLASSES + "'"); 289 Class<?>[] classesExt = ConfigurationService.getClasses(conf, CONF_SERVICE_EXT_CLASSES); 290 log.debug("Services list obtained from property '" + CONF_SERVICE_EXT_CLASSES + "'"); 291 List<Service> list = new ArrayList<Service>(); 292 loadServices(classes, list); 293 loadServices(classesExt, list); 294 295 //removing duplicate services, strategy: last one wins 296 for (Service service : list) { 297 if (map.containsKey(service.getInterface())) { 298 log.debug("Replacing service [{0}] implementation [{1}]", service.getInterface(), 299 service.getClass()); 300 } 301 map.put(service.getInterface(), service); 302 } 303 for (Map.Entry<Class<?>, Service> entry : map.entrySet()) { 304 setService(entry.getValue().getClass()); 305 } 306 } catch (RuntimeException rex) { 307 log.fatal("Runtime Exception during Services Load. Check your list of '{0}' or '{1}'", 308 CONF_SERVICE_CLASSES, CONF_SERVICE_EXT_CLASSES, rex); 309 throw new ServiceException(ErrorCode.E0103, rex.getMessage(), rex); 310 } 311 } 312 313 /** 314 * Destroy all services. 315 */ 316 public void destroy() { 317 XLog log = new XLog(LogFactory.getLog(getClass())); 318 log.trace("Shutting down"); 319 boolean deleteRuntimeDir = false; 320 if (conf != null) { 321 deleteRuntimeDir = conf.getBoolean(CONF_DELETE_RUNTIME_DIR, false); 322 } 323 if (services != null) { 324 List<Service> list = new ArrayList<Service>(services.values()); 325 Collections.reverse(list); 326 for (Service service : list) { 327 try { 328 log.trace("Destroying service[{0}]", service.getInterface()); 329 if (service.getInterface() == XLogService.class) { 330 log.info("Shutdown"); 331 } 332 service.destroy(); 333 } 334 catch (Throwable ex) { 335 log.error("Error destroying service[{0}], {1}", service.getInterface(), ex.getMessage(), ex); 336 } 337 } 338 } 339 if (deleteRuntimeDir) { 340 try { 341 IOUtils.delete(new File(runtimeDir)); 342 } 343 catch (IOException ex) { 344 log.error("Error deleting runtime directory [{0}], {1}", runtimeDir, ex.getMessage(), ex); 345 } 346 } 347 services = null; 348 conf = null; 349 SERVICES = null; 350 } 351 352 /** 353 * Return a service by its public interface. 354 * 355 * @param serviceKlass service public interface. 356 * @return the associated service, or <code>null</code> if not define. 357 */ 358 @SuppressWarnings("unchecked") 359 public <T extends Service> T get(Class<T> serviceKlass) { 360 return (T) services.get(serviceKlass); 361 } 362 363 /** 364 * Set a service programmatically. <p> The service will be initialized by the services. <p> If a service is 365 * already defined with the same public interface it will be destroyed. 366 * 367 * @param klass service klass 368 * @throws ServiceException if the service could not be initialized, at this point all services have been 369 * destroyed. 370 */ 371 public void setService(Class<? extends Service> klass) throws ServiceException { 372 setServiceInternal(klass, true); 373 } 374 375 private void setServiceInternal(Class<? extends Service> klass, boolean logging) throws ServiceException { 376 try { 377 Service newService = (Service) ReflectionUtils.newInstance(klass, null); 378 Service oldService = services.get(newService.getInterface()); 379 if (oldService != null) { 380 oldService.destroy(); 381 } 382 if (logging) { 383 XLog log = new XLog(LogFactory.getLog(getClass())); 384 log.trace("Initializing service[{0}] class[{1}]", newService.getInterface(), newService.getClass()); 385 } 386 newService.init(this); 387 services.put(newService.getInterface(), newService); 388 } 389 catch (ServiceException ex) { 390 XLog.getLog(getClass()).fatal(ex.getMessage(), ex); 391 destroy(); 392 throw ex; 393 } 394 } 395 396 /** 397 * Return the services singleton. 398 * 399 * @return services singleton, <code>null</code> if not initialized. 400 */ 401 public static Services get() { 402 return SERVICES; 403 } 404 405}