001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.Properties;
025    
026    import org.apache.hadoop.conf.Configuration;
027    import org.apache.oozie.ErrorCode;
028    import org.apache.oozie.AppType;
029    import org.apache.oozie.client.event.SLAEvent;
030    import org.apache.oozie.event.WorkflowJobEvent;
031    import org.apache.oozie.executor.jpa.BundleJobGetForUserJPAExecutor;
032    import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserJPAExecutor;
033    import org.apache.oozie.executor.jpa.JPAExecutorException;
034    import org.apache.oozie.executor.jpa.WorkflowJobGetForUserJPAExecutor;
035    import org.apache.oozie.util.XLog;
036    
037    
038    /**
039     * JMS Topic service to retrieve topic names from events or job id
040     *
041     */
042    public class JMSTopicService implements Service {
043    
044        public static final String CONF_PREFIX = Service.CONF_PREFIX + "JMSTopicService.";
045        public static final String TOPIC_NAME = CONF_PREFIX + "topic.name";
046        public static final String TOPIC_PREFIX = CONF_PREFIX + "topic.prefix";
047        private static XLog LOG;
048        private Configuration conf;
049        private final Map<String, String> topicMap = new HashMap<String, String>();
050        private static final List<String> JOB_TYPE_CONSTANTS = new ArrayList<String>();
051        private static final List<String> ALLOWED_TOPIC_NAMES = new ArrayList<String>();
052        private JPAService jpaService = Services.get().get(JPAService.class);
053        private String defaultTopicName = TopicType.USER.value;
054        private String topicPrefix;
055    
056        static {
057            ALLOWED_TOPIC_NAMES.add(TopicType.USER.value);
058            ALLOWED_TOPIC_NAMES.add(TopicType.JOBID.value);
059        }
060    
061        static {
062            JOB_TYPE_CONSTANTS.add(JobType.WORKFLOW.value);
063            JOB_TYPE_CONSTANTS.add(JobType.COORDINATOR.value);
064            JOB_TYPE_CONSTANTS.add(JobType.BUNDLE.value);
065        }
066    
067        public static enum JobType {
068            WORKFLOW("WORKFLOW"), COORDINATOR("COORDINATOR"), BUNDLE("BUNDLE");
069            private String value;
070    
071            JobType(String value) {
072                this.value = value;
073            }
074    
075            String getValue() {
076                return value;
077            }
078    
079        }
080    
081        public static enum TopicType {
082            USER("${username}"), JOBID("${jobId}");
083    
084            private String value;
085    
086            TopicType(String value) {
087                this.value = value;
088            }
089    
090            String getValue() {
091                return value;
092            }
093    
094        }
095    
096        @Override
097        public void init(Services services) throws ServiceException {
098            LOG = XLog.getLog(getClass());
099            conf = services.getConf();
100            parseTopicConfiguration();
101            topicPrefix = conf.get(TOPIC_PREFIX, "");
102        }
103    
104        private void parseTopicConfiguration() throws ServiceException {
105            String topicName = conf.get(TOPIC_NAME, "default=" + TopicType.USER.value);
106            if (topicName == null) {
107                throw new ServiceException(ErrorCode.E0100, getClass().getName(), "JMS topic cannot be null ");
108            }
109            LOG.info("Topic Name is [{0}]", topicName);
110            String[] topic = topicName.trim().split(",");
111            for (int i = 0; i < topic.length; i++) {
112                String[] split = topic[i].trim().split("=");
113                if (split.length == 2) {
114                    split[0] = split[0].trim();
115                    split[1] = split[1].trim();
116                    if (split[0].equals("default")) {
117                        if (!ALLOWED_TOPIC_NAMES.contains(split[1])) {
118                            throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Topic name " + split[1]
119                                    + " not allowed in default; allowed" + "topics are " + ALLOWED_TOPIC_NAMES);
120                        }
121                        defaultTopicName = split[1];
122                    }
123                    else if (!JOB_TYPE_CONSTANTS.contains(split[0])) {
124                        throw new ServiceException(ErrorCode.E0100, getClass().getName(),
125                                "Incorrect job type for defining JMS topic: " + split[0] + " ;" + "allowed job types are "
126                                        + JOB_TYPE_CONSTANTS);
127                    }
128                    else if (!ALLOWED_TOPIC_NAMES.contains(split[1]) && split[1].contains("$")) {
129                        throw new ServiceException(ErrorCode.E0100, getClass().getName(), "JMS topic value " + split[1]
130                                + " " + "for a job type is incorrect " + "Correct values are " + ALLOWED_TOPIC_NAMES);
131                    }
132                    else {
133                        topicMap.put(split[0], split[1]);
134                    }
135                }
136                else {
137                    throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Property " + topic[i]
138                            + "has incorrect syntax; It should be specified as key value pair");
139                }
140            }
141        }
142    
143        /**
144         * Retrieve topic from Job id
145         * @param jobId
146         * @return
147         * @throws JPAExecutorException
148         */
149        public String getTopic(String jobId) throws JPAExecutorException {
150            String topicName = null;
151            if (jobId.contains("-W@")) {
152                jobId = jobId.substring(0, jobId.indexOf('@'));
153                topicName = getTopicForWorkflow(jobId);
154            }
155            else if (jobId.endsWith("-W")) {
156                topicName = getTopicForWorkflow(jobId);
157            }
158            else if (jobId.contains("-C@")) {
159                jobId = jobId.substring(0, jobId.indexOf('@'));
160                topicName = getTopicForCoordinator(jobId);
161            }
162            else if (jobId.endsWith("C")) {
163                topicName = getTopicForCoordinator(jobId);
164            }
165            else if (jobId.contains("-B_")) {
166                jobId = jobId.substring(0, jobId.indexOf('_'));
167                topicName = getTopicForBundle(jobId);
168            }
169    
170            else if (jobId.endsWith("B")) {
171                topicName = getTopicForBundle(jobId);
172            }
173            return topicPrefix + topicName;
174        }
175    
176        /**
177         * Retrieve Topic
178         *
179         * @param appType
180         * @param user
181         * @param jobId
182         * @param parentJobId
183         * @return topicName
184         */
185    
186        public String getTopic(AppType appType, String user, String jobId, String parentJobId) {
187            String topicName = null;
188            String id = jobId;
189            if (appType == AppType.COORDINATOR_JOB || appType == AppType.COORDINATOR_ACTION) {
190                topicName = topicMap.get(JobType.COORDINATOR.value);
191                if (appType == AppType.COORDINATOR_ACTION) {
192                    id = parentJobId;
193                }
194            }
195            else if (appType == AppType.WORKFLOW_JOB || appType == AppType.WORKFLOW_ACTION) {
196                topicName = topicMap.get(JobType.WORKFLOW);
197            }
198    
199            if (topicName == null) {
200                if (defaultTopicName.equals(TopicType.USER.value)) {
201                    topicName = user;
202                }
203                else if (defaultTopicName.equals(TopicType.JOBID.value)) {
204                    topicName = id;
205                }
206            }
207            return topicPrefix + topicName;
208        }
209    
210        private String getTopicForWorkflow(String jobId) throws JPAExecutorException {
211            String topicName = topicMap.get(JobType.WORKFLOW.value);
212            if (topicName == null) {
213                topicName = defaultTopicName;
214            }
215            if (topicName.equals(TopicType.USER.value)) {
216                topicName = jpaService.execute(new WorkflowJobGetForUserJPAExecutor(jobId));
217            }
218            else if (topicName.equals(TopicType.JOBID.value)) {
219                topicName = jobId;
220            }
221            return topicName;
222    
223        }
224    
225        private String getTopicForCoordinator(String jobId) throws JPAExecutorException {
226            String topicName = topicMap.get(JobType.COORDINATOR.value);
227    
228            if (topicName == null) {
229                topicName = defaultTopicName;
230            }
231            if (topicName.equals(TopicType.USER.value)) {
232                topicName = jpaService.execute(new CoordinatorJobGetForUserJPAExecutor(jobId));
233            }
234            else if (topicName.equals(TopicType.JOBID.value)) {
235                topicName = jobId;
236            }
237            return topicName;
238        }
239    
240        private String getTopicForBundle(String jobId) throws JPAExecutorException {
241            String topicName = topicMap.get(JobType.BUNDLE.value);
242            if (topicName == null) {
243                topicName = defaultTopicName;
244            }
245            if (topicName.equals(TopicType.USER.value)) {
246                topicName = jpaService.execute(new BundleJobGetForUserJPAExecutor(jobId));
247            }
248            else if (topicName.equals(TopicType.JOBID.value)) {
249                topicName = jobId;
250            }
251            return topicName;
252        }
253    
254        public Properties getTopicPatternProperties() {
255            Properties props = new Properties();
256            String wfTopic = topicMap.get(JobType.WORKFLOW.value);
257            wfTopic = (wfTopic != null) ? wfTopic : defaultTopicName;
258            props.put(AppType.WORKFLOW_JOB, wfTopic);
259            props.put(AppType.WORKFLOW_ACTION, wfTopic);
260    
261            String coordTopic = topicMap.get(JobType.COORDINATOR.value);
262            coordTopic = (coordTopic != null) ? coordTopic : defaultTopicName;
263            props.put(AppType.COORDINATOR_JOB, coordTopic);
264            props.put(AppType.COORDINATOR_ACTION, coordTopic);
265    
266            String bundleTopic = topicMap.get(JobType.BUNDLE.value);
267            bundleTopic = (bundleTopic != null) ? bundleTopic : defaultTopicName;
268            props.put(AppType.BUNDLE_JOB, bundleTopic);
269            props.put(AppType.BUNDLE_ACTION, bundleTopic);
270    
271            return props;
272        }
273    
274        public String getTopicPrefix() {
275            return topicPrefix;
276        }
277    
278        @Override
279        public void destroy() {
280            topicMap.clear();
281    
282        }
283    
284        @Override
285        public Class<? extends Service> getInterface() {
286            return JMSTopicService.class;
287        }
288    
289    }