001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import com.google.common.annotations.VisibleForTesting; 022import org.apache.hadoop.util.ReflectionUtils; 023import org.apache.oozie.action.ActionExecutor; 024import org.apache.oozie.action.control.EndActionExecutor; 025import org.apache.oozie.action.control.ForkActionExecutor; 026import org.apache.oozie.action.control.JoinActionExecutor; 027import org.apache.oozie.action.control.KillActionExecutor; 028import org.apache.oozie.action.control.StartActionExecutor; 029import org.apache.oozie.util.ParamChecker; 030import org.apache.oozie.util.XLog; 031import java.util.HashMap; 032import java.util.Map; 033import java.util.Set; 034import org.apache.oozie.util.Instrumentable; 035import org.apache.oozie.util.Instrumentation; 036 037public class ActionService implements Service, Instrumentable { 038 039 public static final String CONF_ACTION_EXECUTOR_CLASSES = CONF_PREFIX + "ActionService.executor.classes"; 040 041 public static final String CONF_ACTION_EXECUTOR_EXT_CLASSES = CONF_PREFIX + "ActionService.executor.ext.classes"; 042 043 private Services services; 044 private Map<String, Class<? extends ActionExecutor>> executors; 045 private static XLog LOG = XLog.getLog(ActionService.class); 046 047 @SuppressWarnings({"unchecked", "deprecation"}) 048 @Override 049 public void init(Services services) throws ServiceException { 050 this.services = services; 051 ActionExecutor.enableInit(); 052 ActionExecutor.resetInitInfo(); 053 ActionExecutor.disableInit(); 054 executors = new HashMap<String, Class<? extends ActionExecutor>>(); 055 056 Class<? extends ActionExecutor>[] classes = new Class[] { StartActionExecutor.class, 057 EndActionExecutor.class, KillActionExecutor.class, ForkActionExecutor.class, JoinActionExecutor.class }; 058 registerExecutors(classes); 059 060 classes = (Class<? extends ActionExecutor>[]) ConfigurationService.getClasses 061 (services.getConf(), CONF_ACTION_EXECUTOR_CLASSES); 062 registerExecutors(classes); 063 064 classes = (Class<? extends ActionExecutor>[]) ConfigurationService.getClasses 065 (services.getConf(), CONF_ACTION_EXECUTOR_EXT_CLASSES); 066 registerExecutors(classes); 067 068 initExecutors(); 069 } 070 071 private void registerExecutors(Class<? extends ActionExecutor>[] classes) { 072 if (classes != null) { 073 for (Class<? extends ActionExecutor> executorClass : classes) { 074 @SuppressWarnings("deprecation") 075 ActionExecutor executor = (ActionExecutor) ReflectionUtils.newInstance(executorClass, services.getConf()); 076 executors.put(executor.getType(), executorClass); 077 } 078 } 079 } 080 081 private void initExecutors() { 082 for (Class<? extends ActionExecutor> executorClass : executors.values()) { 083 initExecutor(executorClass); 084 } 085 LOG.info("Initialized action types: " + getActionTypes()); 086 } 087 088 @Override 089 public void destroy() { 090 ActionExecutor.enableInit(); 091 ActionExecutor.resetInitInfo(); 092 ActionExecutor.disableInit(); 093 executors = null; 094 } 095 096 @Override 097 public Class<? extends Service> getInterface() { 098 return ActionService.class; 099 } 100 101 @Override 102 public void instrument(Instrumentation instr) { 103 instr.addVariable("configuration", "action.types", new Instrumentation.Variable<String>() { 104 @Override 105 public String getValue() { 106 Set<String> actionTypes = getActionTypes(); 107 if (actionTypes != null) { 108 return actionTypes.toString(); 109 } 110 return "(unavailable)"; 111 } 112 }); 113 } 114 115 @SuppressWarnings("unchecked") 116 @VisibleForTesting 117 public void registerAndInitExecutor(Class<? extends ActionExecutor> klass) { 118 ActionExecutor.enableInit(); 119 ActionExecutor.resetInitInfo(); 120 ActionExecutor.disableInit(); 121 registerExecutors(new Class[]{klass}); 122 initExecutors(); 123 } 124 125 private void initExecutor(Class<? extends ActionExecutor> klass) { 126 @SuppressWarnings("deprecation") 127 ActionExecutor executor = (ActionExecutor) ReflectionUtils.newInstance(klass, services.getConf()); 128 LOG.debug("Initializing action type [{0}] class [{1}]", executor.getType(), klass); 129 ActionExecutor.enableInit(); 130 executor.initActionType(); 131 ActionExecutor.disableInit(); 132 LOG.trace("Initialized Executor for action type [{0}] class [{1}]", executor.getType(), klass); 133 } 134 135 public ActionExecutor getExecutor(String actionType) { 136 ParamChecker.notEmpty(actionType, "actionType"); 137 Class<? extends ActionExecutor> executorClass = executors.get(actionType); 138 return (executorClass != null) ? (ActionExecutor) ReflectionUtils.newInstance(executorClass, null) : null; 139 } 140 141 public boolean hasActionType(String actionType) { 142 ParamChecker.notEmpty(actionType, "actionType"); 143 return executors.containsKey(actionType); 144 } 145 146 Set<String> getActionTypes() { 147 return executors.keySet(); 148 } 149}