001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import com.google.common.annotations.VisibleForTesting;
022import org.apache.hadoop.util.ReflectionUtils;
023import org.apache.oozie.action.ActionExecutor;
024import org.apache.oozie.action.control.EndActionExecutor;
025import org.apache.oozie.action.control.ForkActionExecutor;
026import org.apache.oozie.action.control.JoinActionExecutor;
027import org.apache.oozie.action.control.KillActionExecutor;
028import org.apache.oozie.action.control.StartActionExecutor;
029import org.apache.oozie.util.ParamChecker;
030import org.apache.oozie.util.XLog;
031import java.util.HashMap;
032import java.util.Map;
033import java.util.Set;
034import org.apache.oozie.util.Instrumentable;
035import org.apache.oozie.util.Instrumentation;
036
037public class ActionService implements Service, Instrumentable {
038
039    public static final String CONF_ACTION_EXECUTOR_CLASSES = CONF_PREFIX + "ActionService.executor.classes";
040
041    public static final String CONF_ACTION_EXECUTOR_EXT_CLASSES = CONF_PREFIX + "ActionService.executor.ext.classes";
042
043    private Services services;
044    private Map<String, Class<? extends ActionExecutor>> executors;
045    private static XLog LOG = XLog.getLog(ActionService.class);
046
047    @SuppressWarnings({"unchecked", "deprecation"})
048    @Override
049    public void init(Services services) throws ServiceException {
050        this.services = services;
051        ActionExecutor.enableInit();
052        ActionExecutor.resetInitInfo();
053        ActionExecutor.disableInit();
054        executors = new HashMap<String, Class<? extends ActionExecutor>>();
055
056        Class<? extends ActionExecutor>[] classes = new Class[] { StartActionExecutor.class,
057            EndActionExecutor.class, KillActionExecutor.class,  ForkActionExecutor.class, JoinActionExecutor.class };
058        registerExecutors(classes);
059
060        classes = (Class<? extends ActionExecutor>[]) ConfigurationService.getClasses
061                (services.getConf(), CONF_ACTION_EXECUTOR_CLASSES);
062        registerExecutors(classes);
063
064        classes = (Class<? extends ActionExecutor>[]) ConfigurationService.getClasses
065                (services.getConf(), CONF_ACTION_EXECUTOR_EXT_CLASSES);
066        registerExecutors(classes);
067
068        initExecutors();
069    }
070
071    private void registerExecutors(Class<? extends ActionExecutor>[] classes) {
072        if (classes != null) {
073            for (Class<? extends ActionExecutor> executorClass : classes) {
074                @SuppressWarnings("deprecation")
075                ActionExecutor executor = (ActionExecutor) ReflectionUtils.newInstance(executorClass, services.getConf());
076                executors.put(executor.getType(), executorClass);
077            }
078        }
079    }
080
081    private void initExecutors() {
082        for (Class<? extends ActionExecutor> executorClass : executors.values()) {
083            initExecutor(executorClass);
084        }
085        LOG.info("Initialized action types: " + getActionTypes());
086    }
087
088    @Override
089    public void destroy() {
090        ActionExecutor.enableInit();
091        ActionExecutor.resetInitInfo();
092        ActionExecutor.disableInit();
093        executors = null;
094    }
095
096    @Override
097    public Class<? extends Service> getInterface() {
098        return ActionService.class;
099    }
100
101    @Override
102    public void instrument(Instrumentation instr) {
103        instr.addVariable("configuration", "action.types", new Instrumentation.Variable<String>() {
104            @Override
105            public String getValue() {
106                Set<String> actionTypes = getActionTypes();
107                if (actionTypes != null) {
108                    return actionTypes.toString();
109                }
110                return "(unavailable)";
111            }
112        });
113    }
114
115    @SuppressWarnings("unchecked")
116    @VisibleForTesting
117    public void registerAndInitExecutor(Class<? extends ActionExecutor> klass) {
118        ActionExecutor.enableInit();
119        ActionExecutor.resetInitInfo();
120        ActionExecutor.disableInit();
121        registerExecutors(new Class[]{klass});
122        initExecutors();
123    }
124
125    private void initExecutor(Class<? extends ActionExecutor> klass) {
126        @SuppressWarnings("deprecation")
127        ActionExecutor executor = (ActionExecutor) ReflectionUtils.newInstance(klass, services.getConf());
128        LOG.debug("Initializing action type [{0}] class [{1}]", executor.getType(), klass);
129        ActionExecutor.enableInit();
130        executor.initActionType();
131        ActionExecutor.disableInit();
132        LOG.trace("Initialized Executor for action type [{0}] class [{1}]", executor.getType(), klass);
133    }
134
135    public ActionExecutor getExecutor(String actionType) {
136        ParamChecker.notEmpty(actionType, "actionType");
137        Class<? extends ActionExecutor> executorClass = executors.get(actionType);
138        return (executorClass != null) ? (ActionExecutor) ReflectionUtils.newInstance(executorClass, null) : null;
139    }
140
141    public boolean hasActionType(String actionType) {
142        ParamChecker.notEmpty(actionType, "actionType");
143        return executors.containsKey(actionType);
144    }
145
146    Set<String> getActionTypes() {
147        return executors.keySet();
148    }
149}