001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import org.apache.commons.logging.LogFactory; 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.util.ReflectionUtils; 023import org.apache.hadoop.util.VersionInfo; 024import org.apache.oozie.BuildInfo; 025import org.apache.oozie.client.OozieClient.SYSTEM_MODE; 026import org.apache.oozie.util.DateUtils; 027import org.apache.oozie.util.XLog; 028import org.apache.oozie.util.Instrumentable; 029import org.apache.oozie.util.Instrumentation; 030import org.apache.oozie.util.IOUtils; 031import org.apache.oozie.ErrorCode; 032 033import java.util.ArrayList; 034import java.util.Collections; 035import java.util.LinkedHashMap; 036import java.util.List; 037import java.util.Map; 038import java.io.IOException; 039import java.io.File; 040 041/** 042 * Services is a singleton that manages the lifecycle of all registered {@link Services}. <p/> It has 2 built in 043 * services: {@link XLogService} and {@link ConfigurationService}. <p/> The rest of the services are loaded from the 044 * {@link #CONF_SERVICE_CLASSES} configuration property. The services class names must be separated by commas (spaces 045 * and enters are allowed). <p/> The {@link #CONF_SYSTEM_MODE} configuration property is any of 046 * NORMAL/SAFEMODE/NOWEBSERVICE. <p/> Services are loaded and initialized in the order they are defined in the in 047 * configuration property. <p/> After all services are initialized, if the Instrumentation service is present, all 048 * services that implement the {@link Instrumentable} are instrumented. <p/> Services are destroyed in reverse order. 049 * <p/> If services initialization fail, initialized services are immediatly destroyed. 050 */ 051public class Services { 052 private static final int MAX_SYSTEM_ID_LEN = 10; 053 054 /** 055 * Environment variable that indicates the location of the Oozie home directory. 056 * The Oozie home directory is used to pick up the conf/ directory from 057 */ 058 public static final String OOZIE_HOME_DIR = "oozie.home.dir"; 059 060 public static final String CONF_SYSTEM_ID = "oozie.system.id"; 061 062 public static final String CONF_SERVICE_CLASSES = "oozie.services"; 063 064 public static final String CONF_SERVICE_EXT_CLASSES = "oozie.services.ext"; 065 066 public static final String CONF_SYSTEM_MODE = "oozie.systemmode"; 067 068 public static final String CONF_DELETE_RUNTIME_DIR = "oozie.delete.runtime.dir.on.shutdown"; 069 070 private static Services SERVICES; 071 072 private SYSTEM_MODE systemMode; 073 private String runtimeDir; 074 private Configuration conf; 075 private Map<Class<? extends Service>, Service> services = new LinkedHashMap<Class<? extends Service>, Service>(); 076 private String systemId; 077 private static String oozieHome; 078 079 public static void setOozieHome() throws ServiceException { 080 oozieHome = System.getProperty(OOZIE_HOME_DIR); 081 if (oozieHome == null) { 082 throw new ServiceException(ErrorCode.E0000); 083 } 084 File file = new File(oozieHome); 085 if (!file.isAbsolute()) { 086 throw new ServiceException(ErrorCode.E0003, oozieHome); 087 } 088 if (!file.exists()) { 089 throw new ServiceException(ErrorCode.E0004, oozieHome); 090 } 091 } 092 093 public static String getOozieHome() throws ServiceException { 094 return oozieHome; 095 } 096 097 /** 098 * Create a services. <p/> The built in services are initialized. 099 * 100 * @throws ServiceException thrown if any of the built in services could not initialize. 101 */ 102 public Services() throws ServiceException { 103 setOozieHome(); 104 if (SERVICES != null) { 105 XLog log = XLog.getLog(getClass()); 106 log.warn(XLog.OPS, "Previous services singleton active, destroying it"); 107 SERVICES.destroy(); 108 SERVICES = null; 109 } 110 setServiceInternal(XLogService.class, false); 111 setServiceInternal(ConfigurationService.class, true); 112 conf = get(ConfigurationService.class).getConf(); 113 DateUtils.setConf(conf); 114 if (!DateUtils.getOozieProcessingTimeZone().equals(DateUtils.UTC)) { 115 XLog.getLog(getClass()).warn("Oozie configured to work in a timezone other than UTC: {0}", 116 DateUtils.getOozieProcessingTimeZone().getID()); 117 } 118 systemId = conf.get(CONF_SYSTEM_ID, ("oozie-" + System.getProperty("user.name"))); 119 if (systemId.length() > MAX_SYSTEM_ID_LEN) { 120 systemId = systemId.substring(0, MAX_SYSTEM_ID_LEN); 121 XLog.getLog(getClass()).warn("System ID [{0}] exceeds maximum length [{1}], trimming", systemId, 122 MAX_SYSTEM_ID_LEN); 123 } 124 setSystemMode(SYSTEM_MODE.valueOf(conf.get(CONF_SYSTEM_MODE, SYSTEM_MODE.NORMAL.toString()))); 125 runtimeDir = createRuntimeDir(); 126 } 127 128 private String createRuntimeDir() throws ServiceException { 129 try { 130 File file = File.createTempFile(getSystemId(), ".dir"); 131 file.delete(); 132 if (!file.mkdir()) { 133 ServiceException ex = new ServiceException(ErrorCode.E0001, file.getAbsolutePath()); 134 XLog.getLog(getClass()).fatal(ex); 135 throw ex; 136 } 137 XLog.getLog(getClass()).info("Initialized runtime directory [{0}]", file.getAbsolutePath()); 138 return file.getAbsolutePath(); 139 } 140 catch (IOException ex) { 141 ServiceException sex = new ServiceException(ErrorCode.E0001, "", ex); 142 XLog.getLog(getClass()).fatal(ex); 143 throw sex; 144 } 145 } 146 147 /** 148 * Return active system mode. <p/> . 149 * 150 * @return 151 */ 152 153 public SYSTEM_MODE getSystemMode() { 154 return systemMode; 155 } 156 157 /** 158 * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a 159 * new directory per Services initialization. 160 * 161 * @return the runtime directory of the Oozie instance. 162 */ 163 public String getRuntimeDir() { 164 return runtimeDir; 165 } 166 167 /** 168 * Return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property. 169 * 170 * @return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property. 171 */ 172 public String getSystemId() { 173 return systemId; 174 } 175 176 /** 177 * Set and set system mode. 178 * 179 * @param sysMode system mode 180 */ 181 182 public synchronized void setSystemMode(SYSTEM_MODE sysMode) { 183 if (this.systemMode != sysMode) { 184 XLog log = XLog.getLog(getClass()); 185 log.info(XLog.OPS, "Exiting " + this.systemMode + " Entering " + sysMode); 186 } 187 this.systemMode = sysMode; 188 } 189 190 /** 191 * Return the services configuration. 192 * 193 * @return services configuraiton. 194 */ 195 public Configuration getConf() { 196 return conf; 197 } 198 199 /** 200 * Initialize all services define in the {@link #CONF_SERVICE_CLASSES} configuration property. 201 * 202 * @throws ServiceException thrown if any of the services could not initialize. 203 */ 204 @SuppressWarnings("unchecked") 205 public void init() throws ServiceException { 206 XLog log = new XLog(LogFactory.getLog(getClass())); 207 log.trace("Initializing"); 208 SERVICES = this; 209 try { 210 loadServices(); 211 } 212 catch (RuntimeException rex) { 213 XLog.getLog(getClass()).fatal(rex.getMessage(), rex); 214 throw rex; 215 } 216 catch (ServiceException ex) { 217 XLog.getLog(getClass()).fatal(ex.getMessage(), ex); 218 SERVICES = null; 219 throw ex; 220 } 221 InstrumentationService instrService = get(InstrumentationService.class); 222 if (instrService != null) { 223 Instrumentation instr = instrService.get(); 224 for (Service service : services.values()) { 225 if (service instanceof Instrumentable) { 226 ((Instrumentable) service).instrument(instr); 227 } 228 } 229 instr.addVariable("oozie", "version", new Instrumentation.Variable<String>() { 230 @Override 231 public String getValue() { 232 return BuildInfo.getBuildInfo().getProperty(BuildInfo.BUILD_VERSION); 233 } 234 }); 235 instr.addVariable("oozie", "mode", new Instrumentation.Variable<String>() { 236 @Override 237 public String getValue() { 238 return getSystemMode().toString(); 239 } 240 }); 241 } 242 log.info("Initialized"); 243 log.info("Running with JARs for Hadoop version [{0}]", VersionInfo.getVersion()); 244 log.info("Oozie System ID [{0}] started!", getSystemId()); 245 } 246 247 /** 248 * Loads the specified services. 249 * 250 * @param classes services classes to load. 251 * @param list list of loaded service in order of appearance in the 252 * configuration. 253 * @throws ServiceException thrown if a service class could not be loaded. 254 */ 255 private void loadServices(Class[] classes, List<Service> list) throws ServiceException { 256 XLog log = new XLog(LogFactory.getLog(getClass())); 257 for (Class klass : classes) { 258 try { 259 Service service = (Service) klass.newInstance(); 260 log.debug("Loading service [{0}] implementation [{1}]", service.getInterface(), 261 service.getClass()); 262 if (!service.getInterface().isInstance(service)) { 263 throw new ServiceException(ErrorCode.E0101, klass, service.getInterface().getName()); 264 } 265 list.add(service); 266 } catch (ServiceException ex) { 267 throw ex; 268 } catch (Exception ex) { 269 throw new ServiceException(ErrorCode.E0102, klass, ex.getMessage(), ex); 270 } 271 } 272 } 273 274 /** 275 * Loads services defined in <code>services</code> and 276 * <code>services.ext</code> and de-dups them. 277 * 278 * @return List of final services to initialize. 279 * @throws ServiceException throw if the services could not be loaded. 280 */ 281 private void loadServices() throws ServiceException { 282 XLog log = new XLog(LogFactory.getLog(getClass())); 283 try { 284 Map<Class, Service> map = new LinkedHashMap<Class, Service>(); 285 Class[] classes = conf.getClasses(CONF_SERVICE_CLASSES); 286 log.debug("Services list obtained from property '" + CONF_SERVICE_CLASSES + "'"); 287 Class[] classesExt = conf.getClasses(CONF_SERVICE_EXT_CLASSES); 288 log.debug("Services list obtained from property '" + CONF_SERVICE_EXT_CLASSES + "'"); 289 List<Service> list = new ArrayList<Service>(); 290 loadServices(classes, list); 291 loadServices(classesExt, list); 292 293 //removing duplicate services, strategy: last one wins 294 for (Service service : list) { 295 if (map.containsKey(service.getInterface())) { 296 log.debug("Replacing service [{0}] implementation [{1}]", service.getInterface(), 297 service.getClass()); 298 } 299 map.put(service.getInterface(), service); 300 } 301 for (Map.Entry<Class, Service> entry : map.entrySet()) { 302 setService(entry.getValue().getClass()); 303 } 304 } catch (RuntimeException rex) { 305 log.fatal("Runtime Exception during Services Load. Check your list of '" + CONF_SERVICE_CLASSES + "' or '" + CONF_SERVICE_EXT_CLASSES + "'"); 306 throw new ServiceException(ErrorCode.E0103, rex.getMessage(), rex); 307 } 308 } 309 310 /** 311 * Destroy all services. 312 */ 313 public void destroy() { 314 XLog log = new XLog(LogFactory.getLog(getClass())); 315 log.trace("Shutting down"); 316 boolean deleteRuntimeDir = false; 317 if (conf != null) { 318 deleteRuntimeDir = conf.getBoolean(CONF_DELETE_RUNTIME_DIR, false); 319 } 320 if (services != null) { 321 List<Service> list = new ArrayList<Service>(services.values()); 322 Collections.reverse(list); 323 for (Service service : list) { 324 try { 325 log.trace("Destroying service[{0}]", service.getInterface()); 326 if (service.getInterface() == XLogService.class) { 327 log.info("Shutdown"); 328 } 329 service.destroy(); 330 } 331 catch (Throwable ex) { 332 log.error("Error destroying service[{0}], {1}", service.getInterface(), ex.getMessage(), ex); 333 } 334 } 335 } 336 if (deleteRuntimeDir) { 337 try { 338 IOUtils.delete(new File(runtimeDir)); 339 } 340 catch (IOException ex) { 341 log.error("Error deleting runtime directory [{0}], {1}", runtimeDir, ex.getMessage(), ex); 342 } 343 } 344 services = null; 345 conf = null; 346 SERVICES = null; 347 } 348 349 /** 350 * Return a service by its public interface. 351 * 352 * @param serviceKlass service public interface. 353 * @return the associated service, or <code>null</code> if not define. 354 */ 355 @SuppressWarnings("unchecked") 356 public <T extends Service> T get(Class<T> serviceKlass) { 357 return (T) services.get(serviceKlass); 358 } 359 360 /** 361 * Set a service programmatically. <p/> The service will be initialized by the services. <p/> If a service is 362 * already defined with the same public interface it will be destroyed. 363 * 364 * @param klass service klass 365 * @throws ServiceException if the service could not be initialized, at this point all services have been 366 * destroyed. 367 */ 368 public void setService(Class<? extends Service> klass) throws ServiceException { 369 setServiceInternal(klass, true); 370 } 371 372 private void setServiceInternal(Class<? extends Service> klass, boolean logging) throws ServiceException { 373 try { 374 Service newService = (Service) ReflectionUtils.newInstance(klass, null); 375 Service oldService = services.get(newService.getInterface()); 376 if (oldService != null) { 377 oldService.destroy(); 378 } 379 if (logging) { 380 XLog log = new XLog(LogFactory.getLog(getClass())); 381 log.trace("Initializing service[{0}] class[{1}]", newService.getInterface(), newService.getClass()); 382 } 383 newService.init(this); 384 services.put(newService.getInterface(), newService); 385 } 386 catch (ServiceException ex) { 387 XLog.getLog(getClass()).fatal(ex.getMessage(), ex); 388 destroy(); 389 throw ex; 390 } 391 } 392 393 /** 394 * Return the services singleton. 395 * 396 * @return services singleton, <code>null</code> if not initialized. 397 */ 398 public static Services get() { 399 return SERVICES; 400 } 401 402}