001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.Properties;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.oozie.ErrorCode;
029import org.apache.oozie.AppType;
030import org.apache.oozie.client.event.SLAEvent;
031import org.apache.oozie.event.WorkflowJobEvent;
032import org.apache.oozie.executor.jpa.BundleJobGetForUserJPAExecutor;
033import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserJPAExecutor;
034import org.apache.oozie.executor.jpa.JPAExecutorException;
035import org.apache.oozie.executor.jpa.WorkflowJobGetForUserJPAExecutor;
036import org.apache.oozie.util.XLog;
037
038
039/**
040 * JMS Topic service to retrieve topic names from events or job id
041 *
042 */
043public class JMSTopicService implements Service {
044
045    public static final String CONF_PREFIX = Service.CONF_PREFIX + "JMSTopicService.";
046    public static final String TOPIC_NAME = CONF_PREFIX + "topic.name";
047    public static final String TOPIC_PREFIX = CONF_PREFIX + "topic.prefix";
048    private static XLog LOG;
049    private Configuration conf;
050    private final Map<String, String> topicMap = new HashMap<String, String>();
051    private static final List<String> JOB_TYPE_CONSTANTS = new ArrayList<String>();
052    private static final List<String> ALLOWED_TOPIC_NAMES = new ArrayList<String>();
053    private JPAService jpaService = Services.get().get(JPAService.class);
054    private String defaultTopicName = TopicType.USER.value;
055    private String topicPrefix;
056
057    static {
058        ALLOWED_TOPIC_NAMES.add(TopicType.USER.value);
059        ALLOWED_TOPIC_NAMES.add(TopicType.JOBID.value);
060    }
061
062    static {
063        JOB_TYPE_CONSTANTS.add(JobType.WORKFLOW.value);
064        JOB_TYPE_CONSTANTS.add(JobType.COORDINATOR.value);
065        JOB_TYPE_CONSTANTS.add(JobType.BUNDLE.value);
066    }
067
068    public static enum JobType {
069        WORKFLOW("WORKFLOW"), COORDINATOR("COORDINATOR"), BUNDLE("BUNDLE");
070        private String value;
071
072        JobType(String value) {
073            this.value = value;
074        }
075
076        String getValue() {
077            return value;
078        }
079
080    }
081
082    public static enum TopicType {
083        USER("${username}"), JOBID("${jobId}");
084
085        private String value;
086
087        TopicType(String value) {
088            this.value = value;
089        }
090
091        String getValue() {
092            return value;
093        }
094
095    }
096
097    @Override
098    public void init(Services services) throws ServiceException {
099        LOG = XLog.getLog(getClass());
100        conf = services.getConf();
101        parseTopicConfiguration();
102        topicPrefix = conf.get(TOPIC_PREFIX, "");
103    }
104
105    private void parseTopicConfiguration() throws ServiceException {
106        String topicName = ConfigurationService.get(conf, TOPIC_NAME);
107        if (topicName == null) {
108            throw new ServiceException(ErrorCode.E0100, getClass().getName(), "JMS topic cannot be null ");
109        }
110        LOG.info("Topic Name is [{0}]", topicName);
111        String[] topic = topicName.trim().split(",");
112        for (int i = 0; i < topic.length; i++) {
113            String[] split = topic[i].trim().split("=");
114            if (split.length == 2) {
115                split[0] = split[0].trim();
116                split[1] = split[1].trim();
117                if (split[0].equals("default")) {
118                    if (!ALLOWED_TOPIC_NAMES.contains(split[1])) {
119                        throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Topic name " + split[1]
120                                + " not allowed in default; allowed" + "topics are " + ALLOWED_TOPIC_NAMES);
121                    }
122                    defaultTopicName = split[1];
123                }
124                else if (!JOB_TYPE_CONSTANTS.contains(split[0])) {
125                    throw new ServiceException(ErrorCode.E0100, getClass().getName(),
126                            "Incorrect job type for defining JMS topic: " + split[0] + " ;" + "allowed job types are "
127                                    + JOB_TYPE_CONSTANTS);
128                }
129                else if (!ALLOWED_TOPIC_NAMES.contains(split[1]) && split[1].contains("$")) {
130                    throw new ServiceException(ErrorCode.E0100, getClass().getName(), "JMS topic value " + split[1]
131                            + " " + "for a job type is incorrect " + "Correct values are " + ALLOWED_TOPIC_NAMES);
132                }
133                else {
134                    topicMap.put(split[0], split[1]);
135                }
136            }
137            else {
138                throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Property " + topic[i]
139                        + "has incorrect syntax; It should be specified as key value pair");
140            }
141        }
142    }
143
144    /**
145     * Retrieve topic from Job id
146     * @param jobId
147     * @return topicPrefix + topicName Retrieve topic from Job id
148     * @throws JPAExecutorException
149     */
150    public String getTopic(String jobId) throws JPAExecutorException {
151        String topicName = null;
152        if (jobId.contains("-W@")) {
153            jobId = jobId.substring(0, jobId.indexOf('@'));
154            topicName = getTopicForWorkflow(jobId);
155        }
156        else if (jobId.endsWith("-W")) {
157            topicName = getTopicForWorkflow(jobId);
158        }
159        else if (jobId.contains("-C@")) {
160            jobId = jobId.substring(0, jobId.indexOf('@'));
161            topicName = getTopicForCoordinator(jobId);
162        }
163        else if (jobId.endsWith("C")) {
164            topicName = getTopicForCoordinator(jobId);
165        }
166        else if (jobId.contains("-B_")) {
167            jobId = jobId.substring(0, jobId.indexOf('_'));
168            topicName = getTopicForBundle(jobId);
169        }
170
171        else if (jobId.endsWith("B")) {
172            topicName = getTopicForBundle(jobId);
173        }
174        return topicPrefix + topicName;
175    }
176
177    /**
178     * Retrieve Topic
179     *
180     * @param appType
181     * @param user
182     * @param jobId
183     * @param parentJobId
184     * @return topicName
185     */
186
187    public String getTopic(AppType appType, String user, String jobId, String parentJobId) {
188        String topicName = null;
189        String id = jobId;
190        if (appType == AppType.COORDINATOR_JOB || appType == AppType.COORDINATOR_ACTION) {
191            topicName = topicMap.get(JobType.COORDINATOR.value);
192            if (appType == AppType.COORDINATOR_ACTION) {
193                id = parentJobId;
194            }
195        }
196        else if (appType == AppType.WORKFLOW_JOB || appType == AppType.WORKFLOW_ACTION) {
197            topicName = topicMap.get(JobType.WORKFLOW.value);
198            if (appType == AppType.WORKFLOW_ACTION) {
199                id = parentJobId;
200            }
201        }
202        else if (appType == AppType.BUNDLE_JOB || appType == AppType.BUNDLE_ACTION) {
203            topicName = topicMap.get(JobType.BUNDLE.value);
204            if (appType == AppType.BUNDLE_ACTION) {
205                id = parentJobId;
206            }
207        }
208
209        if (topicName == null) {
210            if (defaultTopicName.equals(TopicType.USER.value)) {
211                topicName = user;
212            }
213            else if (defaultTopicName.equals(TopicType.JOBID.value)) {
214                topicName = id;
215            }
216        }
217        return topicPrefix + topicName;
218    }
219
220    private String getTopicForWorkflow(String jobId) throws JPAExecutorException {
221        String topicName = topicMap.get(JobType.WORKFLOW.value);
222        if (topicName == null) {
223            topicName = defaultTopicName;
224        }
225        if (topicName.equals(TopicType.USER.value)) {
226            topicName = jpaService.execute(new WorkflowJobGetForUserJPAExecutor(jobId));
227        }
228        else if (topicName.equals(TopicType.JOBID.value)) {
229            topicName = jobId;
230        }
231        return topicName;
232
233    }
234
235    private String getTopicForCoordinator(String jobId) throws JPAExecutorException {
236        String topicName = topicMap.get(JobType.COORDINATOR.value);
237
238        if (topicName == null) {
239            topicName = defaultTopicName;
240        }
241        if (topicName.equals(TopicType.USER.value)) {
242            topicName = jpaService.execute(new CoordinatorJobGetForUserJPAExecutor(jobId));
243        }
244        else if (topicName.equals(TopicType.JOBID.value)) {
245            topicName = jobId;
246        }
247        return topicName;
248    }
249
250    private String getTopicForBundle(String jobId) throws JPAExecutorException {
251        String topicName = topicMap.get(JobType.BUNDLE.value);
252        if (topicName == null) {
253            topicName = defaultTopicName;
254        }
255        if (topicName.equals(TopicType.USER.value)) {
256            topicName = jpaService.execute(new BundleJobGetForUserJPAExecutor(jobId));
257        }
258        else if (topicName.equals(TopicType.JOBID.value)) {
259            topicName = jobId;
260        }
261        return topicName;
262    }
263
264    public Properties getTopicPatternProperties() {
265        Properties props = new Properties();
266        String wfTopic = topicMap.get(JobType.WORKFLOW.value);
267        wfTopic = (wfTopic != null) ? wfTopic : defaultTopicName;
268        props.put(AppType.WORKFLOW_JOB, wfTopic);
269        props.put(AppType.WORKFLOW_ACTION, wfTopic);
270
271        String coordTopic = topicMap.get(JobType.COORDINATOR.value);
272        coordTopic = (coordTopic != null) ? coordTopic : defaultTopicName;
273        props.put(AppType.COORDINATOR_JOB, coordTopic);
274        props.put(AppType.COORDINATOR_ACTION, coordTopic);
275
276        String bundleTopic = topicMap.get(JobType.BUNDLE.value);
277        bundleTopic = (bundleTopic != null) ? bundleTopic : defaultTopicName;
278        props.put(AppType.BUNDLE_JOB, bundleTopic);
279        props.put(AppType.BUNDLE_ACTION, bundleTopic);
280
281        return props;
282    }
283
284    public String getTopicPrefix() {
285        return topicPrefix;
286    }
287
288    @Override
289    public void destroy() {
290        topicMap.clear();
291
292    }
293
294    @Override
295    public Class<? extends Service> getInterface() {
296        return JMSTopicService.class;
297    }
298
299}