This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import org.apache.commons.logging.LogFactory;
021 import org.apache.hadoop.conf.Configuration;
022 import org.apache.hadoop.util.ReflectionUtils;
023 import org.apache.hadoop.util.VersionInfo;
024 import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
025 import org.apache.oozie.util.DateUtils;
026 import org.apache.oozie.util.XLog;
027 import org.apache.oozie.util.Instrumentable;
028 import org.apache.oozie.util.IOUtils;
029 import org.apache.oozie.ErrorCode;
030
031 import java.util.ArrayList;
032 import java.util.Collections;
033 import java.util.LinkedHashMap;
034 import java.util.List;
035 import java.util.Map;
036 import java.io.IOException;
037 import java.io.File;
038
039 /**
040 * Services is a singleton that manages the lifecycle of all registered {@link Services}. <p/> It has 2 built in
041 * services: {@link XLogService} and {@link ConfigurationService}. <p/> The rest of the services are loaded from the
042 * {@link #CONF_SERVICE_CLASSES} configuration property. The services class names must be separated by commas (spaces
043 * and enters are allowed). <p/> The {@link #CONF_SYSTEM_MODE} configuration property is any of
044 * NORMAL/SAFEMODE/NOWEBSERVICE. <p/> Services are loaded and initialized in the order they are defined in the in
045 * configuration property. <p/> After all services are initialized, if the Instrumentation service is present, all
046 * services that implement the {@link Instrumentable} are instrumented. <p/> Services are destroyed in reverse order.
047 * <p/> If services initialization fail, initialized services are immediatly destroyed.
048 */
049 public class Services {
050 private static final int MAX_SYSTEM_ID_LEN = 10;
051
052 /**
053 * Environment variable that indicates the location of the Oozie home directory.
054 * The Oozie home directory is used to pick up the conf/ directory from
055 */
056 public static final String OOZIE_HOME_DIR = "oozie.home.dir";
057
058 public static final String CONF_SYSTEM_ID = "oozie.system.id";
059
060 public static final String CONF_SERVICE_CLASSES = "oozie.services";
061
062 public static final String CONF_SERVICE_EXT_CLASSES = "oozie.services.ext";
063
064 public static final String CONF_SYSTEM_MODE = "oozie.systemmode";
065
066 public static final String CONF_DELETE_RUNTIME_DIR = "oozie.delete.runtime.dir.on.shutdown";
067
068 private static Services SERVICES;
069
070 private SYSTEM_MODE systemMode;
071 private String runtimeDir;
072 private Configuration conf;
073 private Map<Class<? extends Service>, Service> services = new LinkedHashMap<Class<? extends Service>, Service>();
074 private String systemId;
075 private static String oozieHome;
076
077 public static void setOozieHome() throws ServiceException {
078 oozieHome = System.getProperty(OOZIE_HOME_DIR);
079 if (oozieHome == null) {
080 throw new ServiceException(ErrorCode.E0000);
081 }
082 if (!oozieHome.startsWith("/")) {
083 throw new ServiceException(ErrorCode.E0003, oozieHome);
084 }
085 File file = new File(oozieHome);
086 if (!file.exists()) {
087 throw new ServiceException(ErrorCode.E0004, oozieHome);
088 }
089 }
090
091 public static String getOozieHome() throws ServiceException {
092 return oozieHome;
093 }
094
095 /**
096 * Create a services. <p/> The built in services are initialized.
097 *
098 * @throws ServiceException thrown if any of the built in services could not initialize.
099 */
100 public Services() throws ServiceException {
101 setOozieHome();
102 if (SERVICES != null) {
103 XLog log = XLog.getLog(getClass());
104 log.warn(XLog.OPS, "Previous services singleton active, destroying it");
105 SERVICES.destroy();
106 SERVICES = null;
107 }
108 setServiceInternal(XLogService.class, false);
109 setServiceInternal(ConfigurationService.class, true);
110 conf = get(ConfigurationService.class).getConf();
111 DateUtils.setConf(conf);
112 if (!DateUtils.getOozieProcessingTimeZone().equals(DateUtils.UTC)) {
113 XLog.getLog(getClass()).warn("Oozie configured to work in a timezone other than UTC: {0}",
114 DateUtils.getOozieProcessingTimeZone().getID());
115 }
116 systemId = conf.get(CONF_SYSTEM_ID, ("oozie-" + System.getProperty("user.name")));
117 if (systemId.length() > MAX_SYSTEM_ID_LEN) {
118 systemId = systemId.substring(0, MAX_SYSTEM_ID_LEN);
119 XLog.getLog(getClass()).warn("System ID [{0}] exceeds maximum length [{1}], trimming", systemId,
120 MAX_SYSTEM_ID_LEN);
121 }
122 setSystemMode(SYSTEM_MODE.valueOf(conf.get(CONF_SYSTEM_MODE, SYSTEM_MODE.NORMAL.toString())));
123 runtimeDir = createRuntimeDir();
124 }
125
126 private String createRuntimeDir() throws ServiceException {
127 try {
128 File file = File.createTempFile(getSystemId(), ".dir");
129 file.delete();
130 if (!file.mkdir()) {
131 ServiceException ex = new ServiceException(ErrorCode.E0001, file.getAbsolutePath());
132 XLog.getLog(getClass()).fatal(ex);
133 throw ex;
134 }
135 XLog.getLog(getClass()).info("Initialized runtime directory [{0}]", file.getAbsolutePath());
136 return file.getAbsolutePath();
137 }
138 catch (IOException ex) {
139 ServiceException sex = new ServiceException(ErrorCode.E0001, "", ex);
140 XLog.getLog(getClass()).fatal(ex);
141 throw sex;
142 }
143 }
144
145 /**
146 * Return active system mode. <p/> .
147 *
148 * @return
149 */
150
151 public SYSTEM_MODE getSystemMode() {
152 return systemMode;
153 }
154
155 /**
156 * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a
157 * new directory per Services initialization.
158 *
159 * @return the runtime directory of the Oozie instance.
160 */
161 public String getRuntimeDir() {
162 return runtimeDir;
163 }
164
165 /**
166 * Return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property.
167 *
168 * @return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property.
169 */
170 public String getSystemId() {
171 return systemId;
172 }
173
174 /**
175 * Set and set system mode.
176 *
177 * @param sysMode system mode
178 */
179
180 public synchronized void setSystemMode(SYSTEM_MODE sysMode) {
181 if (this.systemMode != sysMode) {
182 XLog log = XLog.getLog(getClass());
183 log.info(XLog.OPS, "Exiting " + this.systemMode + " Entering " + sysMode);
184 }
185 this.systemMode = sysMode;
186 }
187
188 /**
189 * Return the services configuration.
190 *
191 * @return services configuraiton.
192 */
193 public Configuration getConf() {
194 return conf;
195 }
196
197 /**
198 * Initialize all services define in the {@link #CONF_SERVICE_CLASSES} configuration property.
199 *
200 * @throws ServiceException thrown if any of the services could not initialize.
201 */
202 @SuppressWarnings("unchecked")
203 public void init() throws ServiceException {
204 XLog log = new XLog(LogFactory.getLog(getClass()));
205 log.trace("Initializing");
206 SERVICES = this;
207 try {
208 loadServices();
209 }
210 catch (RuntimeException rex) {
211 XLog.getLog(getClass()).fatal(rex.getMessage(), rex);
212 throw rex;
213 }
214 catch (ServiceException ex) {
215 XLog.getLog(getClass()).fatal(ex.getMessage(), ex);
216 SERVICES = null;
217 throw ex;
218 }
219 InstrumentationService instrService = get(InstrumentationService.class);
220 if (instrService != null) {
221 for (Service service : services.values()) {
222 if (service instanceof Instrumentable) {
223 ((Instrumentable) service).instrument(instrService.get());
224 }
225 }
226 }
227 log.info("Initialized");
228 log.info("Running with JARs for Hadoop version [{0}]", VersionInfo.getVersion());
229 log.info("Oozie System ID [{0}] started!", getSystemId());
230 }
231
232 /**
233 * Loads the specified services.
234 *
235 * @param classes services classes to load.
236 * @param list list of loaded service in order of appearance in the
237 * configuration.
238 * @throws ServiceException thrown if a service class could not be loaded.
239 */
240 private void loadServices(Class[] classes, List<Service> list) throws ServiceException {
241 XLog log = new XLog(LogFactory.getLog(getClass()));
242 for (Class klass : classes) {
243 try {
244 Service service = (Service) klass.newInstance();
245 log.debug("Loading service [{0}] implementation [{1}]", service.getInterface(),
246 service.getClass());
247 if (!service.getInterface().isInstance(service)) {
248 throw new ServiceException(ErrorCode.E0101, klass, service.getInterface().getName());
249 }
250 list.add(service);
251 } catch (ServiceException ex) {
252 throw ex;
253 } catch (Exception ex) {
254 throw new ServiceException(ErrorCode.E0102, klass, ex.getMessage(), ex);
255 }
256 }
257 }
258
259 /**
260 * Loads services defined in <code>services</code> and
261 * <code>services.ext</code> and de-dups them.
262 *
263 * @return List of final services to initialize.
264 * @throws ServiceException throw if the services could not be loaded.
265 */
266 private void loadServices() throws ServiceException {
267 XLog log = new XLog(LogFactory.getLog(getClass()));
268 try {
269 Map<Class, Service> map = new LinkedHashMap<Class, Service>();
270 Class[] classes = conf.getClasses(CONF_SERVICE_CLASSES);
271 log.debug("Services list obtained from property '" + CONF_SERVICE_CLASSES + "'");
272 Class[] classesExt = conf.getClasses(CONF_SERVICE_EXT_CLASSES);
273 log.debug("Services list obtained from property '" + CONF_SERVICE_EXT_CLASSES + "'");
274 List<Service> list = new ArrayList<Service>();
275 loadServices(classes, list);
276 loadServices(classesExt, list);
277
278 //removing duplicate services, strategy: last one wins
279 for (Service service : list) {
280 if (map.containsKey(service.getInterface())) {
281 log.debug("Replacing service [{0}] implementation [{1}]", service.getInterface(),
282 service.getClass());
283 }
284 map.put(service.getInterface(), service);
285 }
286 for (Map.Entry<Class, Service> entry : map.entrySet()) {
287 setService(entry.getValue().getClass());
288 }
289 } catch (RuntimeException rex) {
290 log.fatal("Runtime Exception during Services Load. Check your list of '" + CONF_SERVICE_CLASSES + "' or '" + CONF_SERVICE_EXT_CLASSES + "'");
291 throw new ServiceException(ErrorCode.E0103, rex.getMessage(), rex);
292 }
293 }
294
295 /**
296 * Destroy all services.
297 */
298 public void destroy() {
299 XLog log = new XLog(LogFactory.getLog(getClass()));
300 log.trace("Shutting down");
301 boolean deleteRuntimeDir = false;
302 if (conf != null) {
303 deleteRuntimeDir = conf.getBoolean(CONF_DELETE_RUNTIME_DIR, false);
304 }
305 if (services != null) {
306 List<Service> list = new ArrayList<Service>(services.values());
307 Collections.reverse(list);
308 for (Service service : list) {
309 try {
310 log.trace("Destroying service[{0}]", service.getInterface());
311 if (service.getInterface() == XLogService.class) {
312 log.info("Shutdown");
313 }
314 service.destroy();
315 }
316 catch (Throwable ex) {
317 log.error("Error destroying service[{0}], {1}", service.getInterface(), ex.getMessage(), ex);
318 }
319 }
320 }
321 if (deleteRuntimeDir) {
322 try {
323 IOUtils.delete(new File(runtimeDir));
324 }
325 catch (IOException ex) {
326 log.error("Error deleting runtime directory [{0}], {1}", runtimeDir, ex.getMessage(), ex);
327 }
328 }
329 services = null;
330 conf = null;
331 SERVICES = null;
332 }
333
334 /**
335 * Return a service by its public interface.
336 *
337 * @param serviceKlass service public interface.
338 * @return the associated service, or <code>null</code> if not define.
339 */
340 @SuppressWarnings("unchecked")
341 public <T extends Service> T get(Class<T> serviceKlass) {
342 return (T) services.get(serviceKlass);
343 }
344
345 /**
346 * Set a service programmatically. <p/> The service will be initialized by the services. <p/> If a service is
347 * already defined with the same public interface it will be destroyed.
348 *
349 * @param klass service klass
350 * @throws ServiceException if the service could not be initialized, at this point all services have been
351 * destroyed.
352 */
353 public void setService(Class<? extends Service> klass) throws ServiceException {
354 setServiceInternal(klass, true);
355 }
356
357 private void setServiceInternal(Class<? extends Service> klass, boolean logging) throws ServiceException {
358 try {
359 Service newService = (Service) ReflectionUtils.newInstance(klass, null);
360 Service oldService = services.get(newService.getInterface());
361 if (oldService != null) {
362 oldService.destroy();
363 }
364 if (logging) {
365 XLog log = new XLog(LogFactory.getLog(getClass()));
366 log.trace("Initializing service[{0}] class[{1}]", newService.getInterface(), newService.getClass());
367 }
368 newService.init(this);
369 services.put(newService.getInterface(), newService);
370 }
371 catch (ServiceException ex) {
372 XLog.getLog(getClass()).fatal(ex.getMessage(), ex);
373 destroy();
374 throw ex;
375 }
376 }
377
378 /**
379 * Return the services singleton.
380 *
381 * @return services singleton, <code>null</code> if not initialized.
382 */
383 public static Services get() {
384 return SERVICES;
385 }
386
387 }