001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.Properties;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.oozie.ErrorCode;
028import org.apache.oozie.AppType;
029import org.apache.oozie.client.event.SLAEvent;
030import org.apache.oozie.event.WorkflowJobEvent;
031import org.apache.oozie.executor.jpa.BundleJobGetForUserJPAExecutor;
032import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserJPAExecutor;
033import org.apache.oozie.executor.jpa.JPAExecutorException;
034import org.apache.oozie.executor.jpa.WorkflowJobGetForUserJPAExecutor;
035import org.apache.oozie.util.XLog;
036
037
038/**
039 * JMS Topic service to retrieve topic names from events or job id
040 *
041 */
042public class JMSTopicService implements Service {
043
044    public static final String CONF_PREFIX = Service.CONF_PREFIX + "JMSTopicService.";
045    public static final String TOPIC_NAME = CONF_PREFIX + "topic.name";
046    public static final String TOPIC_PREFIX = CONF_PREFIX + "topic.prefix";
047    private static XLog LOG;
048    private Configuration conf;
049    private final Map<String, String> topicMap = new HashMap<String, String>();
050    private static final List<String> JOB_TYPE_CONSTANTS = new ArrayList<String>();
051    private static final List<String> ALLOWED_TOPIC_NAMES = new ArrayList<String>();
052    private JPAService jpaService = Services.get().get(JPAService.class);
053    private String defaultTopicName = TopicType.USER.value;
054    private String topicPrefix;
055
056    static {
057        ALLOWED_TOPIC_NAMES.add(TopicType.USER.value);
058        ALLOWED_TOPIC_NAMES.add(TopicType.JOBID.value);
059    }
060
061    static {
062        JOB_TYPE_CONSTANTS.add(JobType.WORKFLOW.value);
063        JOB_TYPE_CONSTANTS.add(JobType.COORDINATOR.value);
064        JOB_TYPE_CONSTANTS.add(JobType.BUNDLE.value);
065    }
066
067    public static enum JobType {
068        WORKFLOW("WORKFLOW"), COORDINATOR("COORDINATOR"), BUNDLE("BUNDLE");
069        private String value;
070
071        JobType(String value) {
072            this.value = value;
073        }
074
075        String getValue() {
076            return value;
077        }
078
079    }
080
081    public static enum TopicType {
082        USER("${username}"), JOBID("${jobId}");
083
084        private String value;
085
086        TopicType(String value) {
087            this.value = value;
088        }
089
090        String getValue() {
091            return value;
092        }
093
094    }
095
096    @Override
097    public void init(Services services) throws ServiceException {
098        LOG = XLog.getLog(getClass());
099        conf = services.getConf();
100        parseTopicConfiguration();
101        topicPrefix = conf.get(TOPIC_PREFIX, "");
102    }
103
104    private void parseTopicConfiguration() throws ServiceException {
105        String topicName = conf.get(TOPIC_NAME, "default=" + TopicType.USER.value);
106        if (topicName == null) {
107            throw new ServiceException(ErrorCode.E0100, getClass().getName(), "JMS topic cannot be null ");
108        }
109        LOG.info("Topic Name is [{0}]", topicName);
110        String[] topic = topicName.trim().split(",");
111        for (int i = 0; i < topic.length; i++) {
112            String[] split = topic[i].trim().split("=");
113            if (split.length == 2) {
114                split[0] = split[0].trim();
115                split[1] = split[1].trim();
116                if (split[0].equals("default")) {
117                    if (!ALLOWED_TOPIC_NAMES.contains(split[1])) {
118                        throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Topic name " + split[1]
119                                + " not allowed in default; allowed" + "topics are " + ALLOWED_TOPIC_NAMES);
120                    }
121                    defaultTopicName = split[1];
122                }
123                else if (!JOB_TYPE_CONSTANTS.contains(split[0])) {
124                    throw new ServiceException(ErrorCode.E0100, getClass().getName(),
125                            "Incorrect job type for defining JMS topic: " + split[0] + " ;" + "allowed job types are "
126                                    + JOB_TYPE_CONSTANTS);
127                }
128                else if (!ALLOWED_TOPIC_NAMES.contains(split[1]) && split[1].contains("$")) {
129                    throw new ServiceException(ErrorCode.E0100, getClass().getName(), "JMS topic value " + split[1]
130                            + " " + "for a job type is incorrect " + "Correct values are " + ALLOWED_TOPIC_NAMES);
131                }
132                else {
133                    topicMap.put(split[0], split[1]);
134                }
135            }
136            else {
137                throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Property " + topic[i]
138                        + "has incorrect syntax; It should be specified as key value pair");
139            }
140        }
141    }
142
143    /**
144     * Retrieve topic from Job id
145     * @param jobId
146     * @return
147     * @throws JPAExecutorException
148     */
149    public String getTopic(String jobId) throws JPAExecutorException {
150        String topicName = null;
151        if (jobId.contains("-W@")) {
152            jobId = jobId.substring(0, jobId.indexOf('@'));
153            topicName = getTopicForWorkflow(jobId);
154        }
155        else if (jobId.endsWith("-W")) {
156            topicName = getTopicForWorkflow(jobId);
157        }
158        else if (jobId.contains("-C@")) {
159            jobId = jobId.substring(0, jobId.indexOf('@'));
160            topicName = getTopicForCoordinator(jobId);
161        }
162        else if (jobId.endsWith("C")) {
163            topicName = getTopicForCoordinator(jobId);
164        }
165        else if (jobId.contains("-B_")) {
166            jobId = jobId.substring(0, jobId.indexOf('_'));
167            topicName = getTopicForBundle(jobId);
168        }
169
170        else if (jobId.endsWith("B")) {
171            topicName = getTopicForBundle(jobId);
172        }
173        return topicPrefix + topicName;
174    }
175
176    /**
177     * Retrieve Topic
178     *
179     * @param appType
180     * @param user
181     * @param jobId
182     * @param parentJobId
183     * @return topicName
184     */
185
186    public String getTopic(AppType appType, String user, String jobId, String parentJobId) {
187        String topicName = null;
188        String id = jobId;
189        if (appType == AppType.COORDINATOR_JOB || appType == AppType.COORDINATOR_ACTION) {
190            topicName = topicMap.get(JobType.COORDINATOR.value);
191            if (appType == AppType.COORDINATOR_ACTION) {
192                id = parentJobId;
193            }
194        }
195        else if (appType == AppType.WORKFLOW_JOB || appType == AppType.WORKFLOW_ACTION) {
196            topicName = topicMap.get(JobType.WORKFLOW.value);
197            if (appType == AppType.WORKFLOW_ACTION) {
198                id = parentJobId;
199            }
200        }
201        else if (appType == AppType.BUNDLE_JOB || appType == AppType.BUNDLE_ACTION) {
202            topicName = topicMap.get(JobType.BUNDLE.value);
203            if (appType == AppType.BUNDLE_ACTION) {
204                id = parentJobId;
205            }
206        }
207
208        if (topicName == null) {
209            if (defaultTopicName.equals(TopicType.USER.value)) {
210                topicName = user;
211            }
212            else if (defaultTopicName.equals(TopicType.JOBID.value)) {
213                topicName = id;
214            }
215        }
216        return topicPrefix + topicName;
217    }
218
219    private String getTopicForWorkflow(String jobId) throws JPAExecutorException {
220        String topicName = topicMap.get(JobType.WORKFLOW.value);
221        if (topicName == null) {
222            topicName = defaultTopicName;
223        }
224        if (topicName.equals(TopicType.USER.value)) {
225            topicName = jpaService.execute(new WorkflowJobGetForUserJPAExecutor(jobId));
226        }
227        else if (topicName.equals(TopicType.JOBID.value)) {
228            topicName = jobId;
229        }
230        return topicName;
231
232    }
233
234    private String getTopicForCoordinator(String jobId) throws JPAExecutorException {
235        String topicName = topicMap.get(JobType.COORDINATOR.value);
236
237        if (topicName == null) {
238            topicName = defaultTopicName;
239        }
240        if (topicName.equals(TopicType.USER.value)) {
241            topicName = jpaService.execute(new CoordinatorJobGetForUserJPAExecutor(jobId));
242        }
243        else if (topicName.equals(TopicType.JOBID.value)) {
244            topicName = jobId;
245        }
246        return topicName;
247    }
248
249    private String getTopicForBundle(String jobId) throws JPAExecutorException {
250        String topicName = topicMap.get(JobType.BUNDLE.value);
251        if (topicName == null) {
252            topicName = defaultTopicName;
253        }
254        if (topicName.equals(TopicType.USER.value)) {
255            topicName = jpaService.execute(new BundleJobGetForUserJPAExecutor(jobId));
256        }
257        else if (topicName.equals(TopicType.JOBID.value)) {
258            topicName = jobId;
259        }
260        return topicName;
261    }
262
263    public Properties getTopicPatternProperties() {
264        Properties props = new Properties();
265        String wfTopic = topicMap.get(JobType.WORKFLOW.value);
266        wfTopic = (wfTopic != null) ? wfTopic : defaultTopicName;
267        props.put(AppType.WORKFLOW_JOB, wfTopic);
268        props.put(AppType.WORKFLOW_ACTION, wfTopic);
269
270        String coordTopic = topicMap.get(JobType.COORDINATOR.value);
271        coordTopic = (coordTopic != null) ? coordTopic : defaultTopicName;
272        props.put(AppType.COORDINATOR_JOB, coordTopic);
273        props.put(AppType.COORDINATOR_ACTION, coordTopic);
274
275        String bundleTopic = topicMap.get(JobType.BUNDLE.value);
276        bundleTopic = (bundleTopic != null) ? bundleTopic : defaultTopicName;
277        props.put(AppType.BUNDLE_JOB, bundleTopic);
278        props.put(AppType.BUNDLE_ACTION, bundleTopic);
279
280        return props;
281    }
282
283    public String getTopicPrefix() {
284        return topicPrefix;
285    }
286
287    @Override
288    public void destroy() {
289        topicMap.clear();
290
291    }
292
293    @Override
294    public Class<? extends Service> getInterface() {
295        return JMSTopicService.class;
296    }
297
298}