001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import org.apache.commons.logging.LogFactory; 021 import org.apache.hadoop.conf.Configuration; 022 import org.apache.hadoop.util.ReflectionUtils; 023 import org.apache.hadoop.util.VersionInfo; 024 import org.apache.oozie.client.OozieClient.SYSTEM_MODE; 025 import org.apache.oozie.util.DateUtils; 026 import org.apache.oozie.util.XLog; 027 import org.apache.oozie.util.Instrumentable; 028 import org.apache.oozie.util.IOUtils; 029 import org.apache.oozie.ErrorCode; 030 031 import java.util.ArrayList; 032 import java.util.Collections; 033 import java.util.LinkedHashMap; 034 import java.util.List; 035 import java.util.Map; 036 import java.io.IOException; 037 import java.io.File; 038 039 /** 040 * Services is a singleton that manages the lifecycle of all registered {@link Services}. <p/> It has 2 built in 041 * services: {@link XLogService} and {@link ConfigurationService}. <p/> The rest of the services are loaded from the 042 * {@link #CONF_SERVICE_CLASSES} configuration property. The services class names must be separated by commas (spaces 043 * and enters are allowed). <p/> The {@link #CONF_SYSTEM_MODE} configuration property is any of 044 * NORMAL/SAFEMODE/NOWEBSERVICE. <p/> Services are loaded and initialized in the order they are defined in the in 045 * configuration property. <p/> After all services are initialized, if the Instrumentation service is present, all 046 * services that implement the {@link Instrumentable} are instrumented. <p/> Services are destroyed in reverse order. 047 * <p/> If services initialization fail, initialized services are immediatly destroyed. 048 */ 049 public class Services { 050 private static final int MAX_SYSTEM_ID_LEN = 10; 051 052 /** 053 * Environment variable that indicates the location of the Oozie home directory. 054 * The Oozie home directory is used to pick up the conf/ directory from 055 */ 056 public static final String OOZIE_HOME_DIR = "oozie.home.dir"; 057 058 public static final String CONF_SYSTEM_ID = "oozie.system.id"; 059 060 public static final String CONF_SERVICE_CLASSES = "oozie.services"; 061 062 public static final String CONF_SERVICE_EXT_CLASSES = "oozie.services.ext"; 063 064 public static final String CONF_SYSTEM_MODE = "oozie.systemmode"; 065 066 public static final String CONF_DELETE_RUNTIME_DIR = "oozie.delete.runtime.dir.on.shutdown"; 067 068 private static Services SERVICES; 069 070 private SYSTEM_MODE systemMode; 071 private String runtimeDir; 072 private Configuration conf; 073 private Map<Class<? extends Service>, Service> services = new LinkedHashMap<Class<? extends Service>, Service>(); 074 private String systemId; 075 private static String oozieHome; 076 077 public static void setOozieHome() throws ServiceException { 078 oozieHome = System.getProperty(OOZIE_HOME_DIR); 079 if (oozieHome == null) { 080 throw new ServiceException(ErrorCode.E0000); 081 } 082 if (!oozieHome.startsWith("/")) { 083 throw new ServiceException(ErrorCode.E0003, oozieHome); 084 } 085 File file = new File(oozieHome); 086 if (!file.exists()) { 087 throw new ServiceException(ErrorCode.E0004, oozieHome); 088 } 089 } 090 091 public static String getOozieHome() throws ServiceException { 092 return oozieHome; 093 } 094 095 /** 096 * Create a services. <p/> The built in services are initialized. 097 * 098 * @throws ServiceException thrown if any of the built in services could not initialize. 099 */ 100 public Services() throws ServiceException { 101 setOozieHome(); 102 if (SERVICES != null) { 103 XLog log = XLog.getLog(getClass()); 104 log.warn(XLog.OPS, "Previous services singleton active, destroying it"); 105 SERVICES.destroy(); 106 SERVICES = null; 107 } 108 setServiceInternal(XLogService.class, false); 109 setServiceInternal(ConfigurationService.class, true); 110 conf = get(ConfigurationService.class).getConf(); 111 DateUtils.setConf(conf); 112 if (!DateUtils.getOozieProcessingTimeZone().equals(DateUtils.UTC)) { 113 XLog.getLog(getClass()).warn("Oozie configured to work in a timezone other than UTC: {0}", 114 DateUtils.getOozieProcessingTimeZone().getID()); 115 } 116 systemId = conf.get(CONF_SYSTEM_ID, ("oozie-" + System.getProperty("user.name"))); 117 if (systemId.length() > MAX_SYSTEM_ID_LEN) { 118 systemId = systemId.substring(0, MAX_SYSTEM_ID_LEN); 119 XLog.getLog(getClass()).warn("System ID [{0}] exceeds maximum length [{1}], trimming", systemId, 120 MAX_SYSTEM_ID_LEN); 121 } 122 setSystemMode(SYSTEM_MODE.valueOf(conf.get(CONF_SYSTEM_MODE, SYSTEM_MODE.NORMAL.toString()))); 123 runtimeDir = createRuntimeDir(); 124 } 125 126 private String createRuntimeDir() throws ServiceException { 127 try { 128 File file = File.createTempFile(getSystemId(), ".dir"); 129 file.delete(); 130 if (!file.mkdir()) { 131 ServiceException ex = new ServiceException(ErrorCode.E0001, file.getAbsolutePath()); 132 XLog.getLog(getClass()).fatal(ex); 133 throw ex; 134 } 135 XLog.getLog(getClass()).info("Initialized runtime directory [{0}]", file.getAbsolutePath()); 136 return file.getAbsolutePath(); 137 } 138 catch (IOException ex) { 139 ServiceException sex = new ServiceException(ErrorCode.E0001, "", ex); 140 XLog.getLog(getClass()).fatal(ex); 141 throw sex; 142 } 143 } 144 145 /** 146 * Return active system mode. <p/> . 147 * 148 * @return 149 */ 150 151 public SYSTEM_MODE getSystemMode() { 152 return systemMode; 153 } 154 155 /** 156 * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a 157 * new directory per Services initialization. 158 * 159 * @return the runtime directory of the Oozie instance. 160 */ 161 public String getRuntimeDir() { 162 return runtimeDir; 163 } 164 165 /** 166 * Return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property. 167 * 168 * @return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property. 169 */ 170 public String getSystemId() { 171 return systemId; 172 } 173 174 /** 175 * Set and set system mode. 176 * 177 * @param sysMode system mode 178 */ 179 180 public synchronized void setSystemMode(SYSTEM_MODE sysMode) { 181 if (this.systemMode != sysMode) { 182 XLog log = XLog.getLog(getClass()); 183 log.info(XLog.OPS, "Exiting " + this.systemMode + " Entering " + sysMode); 184 } 185 this.systemMode = sysMode; 186 } 187 188 /** 189 * Return the services configuration. 190 * 191 * @return services configuraiton. 192 */ 193 public Configuration getConf() { 194 return conf; 195 } 196 197 /** 198 * Initialize all services define in the {@link #CONF_SERVICE_CLASSES} configuration property. 199 * 200 * @throws ServiceException thrown if any of the services could not initialize. 201 */ 202 @SuppressWarnings("unchecked") 203 public void init() throws ServiceException { 204 XLog log = new XLog(LogFactory.getLog(getClass())); 205 log.trace("Initializing"); 206 SERVICES = this; 207 try { 208 loadServices(); 209 } 210 catch (RuntimeException rex) { 211 XLog.getLog(getClass()).fatal(rex.getMessage(), rex); 212 throw rex; 213 } 214 catch (ServiceException ex) { 215 XLog.getLog(getClass()).fatal(ex.getMessage(), ex); 216 SERVICES = null; 217 throw ex; 218 } 219 InstrumentationService instrService = get(InstrumentationService.class); 220 if (instrService != null) { 221 for (Service service : services.values()) { 222 if (service instanceof Instrumentable) { 223 ((Instrumentable) service).instrument(instrService.get()); 224 } 225 } 226 } 227 log.info("Initialized"); 228 log.info("Running with JARs for Hadoop version [{0}]", VersionInfo.getVersion()); 229 log.info("Oozie System ID [{0}] started!", getSystemId()); 230 } 231 232 /** 233 * Loads the specified services. 234 * 235 * @param classes services classes to load. 236 * @param list list of loaded service in order of appearance in the 237 * configuration. 238 * @throws ServiceException thrown if a service class could not be loaded. 239 */ 240 private void loadServices(Class[] classes, List<Service> list) throws ServiceException { 241 XLog log = new XLog(LogFactory.getLog(getClass())); 242 for (Class klass : classes) { 243 try { 244 Service service = (Service) klass.newInstance(); 245 log.debug("Loading service [{0}] implementation [{1}]", service.getInterface(), 246 service.getClass()); 247 if (!service.getInterface().isInstance(service)) { 248 throw new ServiceException(ErrorCode.E0101, klass, service.getInterface().getName()); 249 } 250 list.add(service); 251 } catch (ServiceException ex) { 252 throw ex; 253 } catch (Exception ex) { 254 throw new ServiceException(ErrorCode.E0102, klass, ex.getMessage(), ex); 255 } 256 } 257 } 258 259 /** 260 * Loads services defined in <code>services</code> and 261 * <code>services.ext</code> and de-dups them. 262 * 263 * @return List of final services to initialize. 264 * @throws ServiceException throw if the services could not be loaded. 265 */ 266 private void loadServices() throws ServiceException { 267 XLog log = new XLog(LogFactory.getLog(getClass())); 268 try { 269 Map<Class, Service> map = new LinkedHashMap<Class, Service>(); 270 Class[] classes = conf.getClasses(CONF_SERVICE_CLASSES); 271 log.debug("Services list obtained from property '" + CONF_SERVICE_CLASSES + "'"); 272 Class[] classesExt = conf.getClasses(CONF_SERVICE_EXT_CLASSES); 273 log.debug("Services list obtained from property '" + CONF_SERVICE_EXT_CLASSES + "'"); 274 List<Service> list = new ArrayList<Service>(); 275 loadServices(classes, list); 276 loadServices(classesExt, list); 277 278 //removing duplicate services, strategy: last one wins 279 for (Service service : list) { 280 if (map.containsKey(service.getInterface())) { 281 log.debug("Replacing service [{0}] implementation [{1}]", service.getInterface(), 282 service.getClass()); 283 } 284 map.put(service.getInterface(), service); 285 } 286 for (Map.Entry<Class, Service> entry : map.entrySet()) { 287 setService(entry.getValue().getClass()); 288 } 289 } catch (RuntimeException rex) { 290 log.fatal("Runtime Exception during Services Load. Check your list of '" + CONF_SERVICE_CLASSES + "' or '" + CONF_SERVICE_EXT_CLASSES + "'"); 291 throw new ServiceException(ErrorCode.E0103, rex.getMessage(), rex); 292 } 293 } 294 295 /** 296 * Destroy all services. 297 */ 298 public void destroy() { 299 XLog log = new XLog(LogFactory.getLog(getClass())); 300 log.trace("Shutting down"); 301 boolean deleteRuntimeDir = false; 302 if (conf != null) { 303 deleteRuntimeDir = conf.getBoolean(CONF_DELETE_RUNTIME_DIR, false); 304 } 305 if (services != null) { 306 List<Service> list = new ArrayList<Service>(services.values()); 307 Collections.reverse(list); 308 for (Service service : list) { 309 try { 310 log.trace("Destroying service[{0}]", service.getInterface()); 311 if (service.getInterface() == XLogService.class) { 312 log.info("Shutdown"); 313 } 314 service.destroy(); 315 } 316 catch (Throwable ex) { 317 log.error("Error destroying service[{0}], {1}", service.getInterface(), ex.getMessage(), ex); 318 } 319 } 320 } 321 if (deleteRuntimeDir) { 322 try { 323 IOUtils.delete(new File(runtimeDir)); 324 } 325 catch (IOException ex) { 326 log.error("Error deleting runtime directory [{0}], {1}", runtimeDir, ex.getMessage(), ex); 327 } 328 } 329 services = null; 330 conf = null; 331 SERVICES = null; 332 } 333 334 /** 335 * Return a service by its public interface. 336 * 337 * @param serviceKlass service public interface. 338 * @return the associated service, or <code>null</code> if not define. 339 */ 340 @SuppressWarnings("unchecked") 341 public <T extends Service> T get(Class<T> serviceKlass) { 342 return (T) services.get(serviceKlass); 343 } 344 345 /** 346 * Set a service programmatically. <p/> The service will be initialized by the services. <p/> If a service is 347 * already defined with the same public interface it will be destroyed. 348 * 349 * @param klass service klass 350 * @throws ServiceException if the service could not be initialized, at this point all services have been 351 * destroyed. 352 */ 353 public void setService(Class<? extends Service> klass) throws ServiceException { 354 setServiceInternal(klass, true); 355 } 356 357 private void setServiceInternal(Class<? extends Service> klass, boolean logging) throws ServiceException { 358 try { 359 Service newService = (Service) ReflectionUtils.newInstance(klass, null); 360 Service oldService = services.get(newService.getInterface()); 361 if (oldService != null) { 362 oldService.destroy(); 363 } 364 if (logging) { 365 XLog log = new XLog(LogFactory.getLog(getClass())); 366 log.trace("Initializing service[{0}] class[{1}]", newService.getInterface(), newService.getClass()); 367 } 368 newService.init(this); 369 services.put(newService.getInterface(), newService); 370 } 371 catch (ServiceException ex) { 372 XLog.getLog(getClass()).fatal(ex.getMessage(), ex); 373 destroy(); 374 throw ex; 375 } 376 } 377 378 /** 379 * Return the services singleton. 380 * 381 * @return services singleton, <code>null</code> if not initialized. 382 */ 383 public static Services get() { 384 return SERVICES; 385 } 386 387 }