This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.util.ArrayList;
021 import java.util.HashMap;
022 import java.util.List;
023 import java.util.Map;
024 import java.util.Properties;
025
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.oozie.ErrorCode;
028 import org.apache.oozie.AppType;
029 import org.apache.oozie.client.event.SLAEvent;
030 import org.apache.oozie.event.WorkflowJobEvent;
031 import org.apache.oozie.executor.jpa.BundleJobGetForUserJPAExecutor;
032 import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserJPAExecutor;
033 import org.apache.oozie.executor.jpa.JPAExecutorException;
034 import org.apache.oozie.executor.jpa.WorkflowJobGetForUserJPAExecutor;
035 import org.apache.oozie.util.XLog;
036
037
038 /**
039 * JMS Topic service to retrieve topic names from events or job id
040 *
041 */
042 public class JMSTopicService implements Service {
043
044 public static final String CONF_PREFIX = Service.CONF_PREFIX + "JMSTopicService.";
045 public static final String TOPIC_NAME = CONF_PREFIX + "topic.name";
046 public static final String TOPIC_PREFIX = CONF_PREFIX + "topic.prefix";
047 private static XLog LOG;
048 private Configuration conf;
049 private final Map<String, String> topicMap = new HashMap<String, String>();
050 private static final List<String> JOB_TYPE_CONSTANTS = new ArrayList<String>();
051 private static final List<String> ALLOWED_TOPIC_NAMES = new ArrayList<String>();
052 private JPAService jpaService = Services.get().get(JPAService.class);
053 private String defaultTopicName = TopicType.USER.value;
054 private String topicPrefix;
055
056 static {
057 ALLOWED_TOPIC_NAMES.add(TopicType.USER.value);
058 ALLOWED_TOPIC_NAMES.add(TopicType.JOBID.value);
059 }
060
061 static {
062 JOB_TYPE_CONSTANTS.add(JobType.WORKFLOW.value);
063 JOB_TYPE_CONSTANTS.add(JobType.COORDINATOR.value);
064 JOB_TYPE_CONSTANTS.add(JobType.BUNDLE.value);
065 }
066
067 public static enum JobType {
068 WORKFLOW("WORKFLOW"), COORDINATOR("COORDINATOR"), BUNDLE("BUNDLE");
069 private String value;
070
071 JobType(String value) {
072 this.value = value;
073 }
074
075 String getValue() {
076 return value;
077 }
078
079 }
080
081 public static enum TopicType {
082 USER("${username}"), JOBID("${jobId}");
083
084 private String value;
085
086 TopicType(String value) {
087 this.value = value;
088 }
089
090 String getValue() {
091 return value;
092 }
093
094 }
095
096 @Override
097 public void init(Services services) throws ServiceException {
098 LOG = XLog.getLog(getClass());
099 conf = services.getConf();
100 parseTopicConfiguration();
101 topicPrefix = conf.get(TOPIC_PREFIX, "");
102 }
103
104 private void parseTopicConfiguration() throws ServiceException {
105 String topicName = conf.get(TOPIC_NAME, "default=" + TopicType.USER.value);
106 if (topicName == null) {
107 throw new ServiceException(ErrorCode.E0100, getClass().getName(), "JMS topic cannot be null ");
108 }
109 LOG.info("Topic Name is [{0}]", topicName);
110 String[] topic = topicName.trim().split(",");
111 for (int i = 0; i < topic.length; i++) {
112 String[] split = topic[i].trim().split("=");
113 if (split.length == 2) {
114 split[0] = split[0].trim();
115 split[1] = split[1].trim();
116 if (split[0].equals("default")) {
117 if (!ALLOWED_TOPIC_NAMES.contains(split[1])) {
118 throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Topic name " + split[1]
119 + " not allowed in default; allowed" + "topics are " + ALLOWED_TOPIC_NAMES);
120 }
121 defaultTopicName = split[1];
122 }
123 else if (!JOB_TYPE_CONSTANTS.contains(split[0])) {
124 throw new ServiceException(ErrorCode.E0100, getClass().getName(),
125 "Incorrect job type for defining JMS topic: " + split[0] + " ;" + "allowed job types are "
126 + JOB_TYPE_CONSTANTS);
127 }
128 else if (!ALLOWED_TOPIC_NAMES.contains(split[1]) && split[1].contains("$")) {
129 throw new ServiceException(ErrorCode.E0100, getClass().getName(), "JMS topic value " + split[1]
130 + " " + "for a job type is incorrect " + "Correct values are " + ALLOWED_TOPIC_NAMES);
131 }
132 else {
133 topicMap.put(split[0], split[1]);
134 }
135 }
136 else {
137 throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Property " + topic[i]
138 + "has incorrect syntax; It should be specified as key value pair");
139 }
140 }
141 }
142
143 /**
144 * Retrieve topic from Job id
145 * @param jobId
146 * @return
147 * @throws JPAExecutorException
148 */
149 public String getTopic(String jobId) throws JPAExecutorException {
150 String topicName = null;
151 if (jobId.contains("-W@")) {
152 jobId = jobId.substring(0, jobId.indexOf('@'));
153 topicName = getTopicForWorkflow(jobId);
154 }
155 else if (jobId.endsWith("-W")) {
156 topicName = getTopicForWorkflow(jobId);
157 }
158 else if (jobId.contains("-C@")) {
159 jobId = jobId.substring(0, jobId.indexOf('@'));
160 topicName = getTopicForCoordinator(jobId);
161 }
162 else if (jobId.endsWith("C")) {
163 topicName = getTopicForCoordinator(jobId);
164 }
165 else if (jobId.contains("-B_")) {
166 jobId = jobId.substring(0, jobId.indexOf('_'));
167 topicName = getTopicForBundle(jobId);
168 }
169
170 else if (jobId.endsWith("B")) {
171 topicName = getTopicForBundle(jobId);
172 }
173 return topicPrefix + topicName;
174 }
175
176 /**
177 * Retrieve Topic
178 *
179 * @param appType
180 * @param user
181 * @param jobId
182 * @param parentJobId
183 * @return topicName
184 */
185
186 public String getTopic(AppType appType, String user, String jobId, String parentJobId) {
187 String topicName = null;
188 String id = jobId;
189 if (appType == AppType.COORDINATOR_JOB || appType == AppType.COORDINATOR_ACTION) {
190 topicName = topicMap.get(JobType.COORDINATOR.value);
191 if (appType == AppType.COORDINATOR_ACTION) {
192 id = parentJobId;
193 }
194 }
195 else if (appType == AppType.WORKFLOW_JOB || appType == AppType.WORKFLOW_ACTION) {
196 topicName = topicMap.get(JobType.WORKFLOW);
197 }
198
199 if (topicName == null) {
200 if (defaultTopicName.equals(TopicType.USER.value)) {
201 topicName = user;
202 }
203 else if (defaultTopicName.equals(TopicType.JOBID.value)) {
204 topicName = id;
205 }
206 }
207 return topicPrefix + topicName;
208 }
209
210 private String getTopicForWorkflow(String jobId) throws JPAExecutorException {
211 String topicName = topicMap.get(JobType.WORKFLOW.value);
212 if (topicName == null) {
213 topicName = defaultTopicName;
214 }
215 if (topicName.equals(TopicType.USER.value)) {
216 topicName = jpaService.execute(new WorkflowJobGetForUserJPAExecutor(jobId));
217 }
218 else if (topicName.equals(TopicType.JOBID.value)) {
219 topicName = jobId;
220 }
221 return topicName;
222
223 }
224
225 private String getTopicForCoordinator(String jobId) throws JPAExecutorException {
226 String topicName = topicMap.get(JobType.COORDINATOR.value);
227
228 if (topicName == null) {
229 topicName = defaultTopicName;
230 }
231 if (topicName.equals(TopicType.USER.value)) {
232 topicName = jpaService.execute(new CoordinatorJobGetForUserJPAExecutor(jobId));
233 }
234 else if (topicName.equals(TopicType.JOBID.value)) {
235 topicName = jobId;
236 }
237 return topicName;
238 }
239
240 private String getTopicForBundle(String jobId) throws JPAExecutorException {
241 String topicName = topicMap.get(JobType.BUNDLE.value);
242 if (topicName == null) {
243 topicName = defaultTopicName;
244 }
245 if (topicName.equals(TopicType.USER.value)) {
246 topicName = jpaService.execute(new BundleJobGetForUserJPAExecutor(jobId));
247 }
248 else if (topicName.equals(TopicType.JOBID.value)) {
249 topicName = jobId;
250 }
251 return topicName;
252 }
253
254 public Properties getTopicPatternProperties() {
255 Properties props = new Properties();
256 String wfTopic = topicMap.get(JobType.WORKFLOW.value);
257 wfTopic = (wfTopic != null) ? wfTopic : defaultTopicName;
258 props.put(AppType.WORKFLOW_JOB, wfTopic);
259 props.put(AppType.WORKFLOW_ACTION, wfTopic);
260
261 String coordTopic = topicMap.get(JobType.COORDINATOR.value);
262 coordTopic = (coordTopic != null) ? coordTopic : defaultTopicName;
263 props.put(AppType.COORDINATOR_JOB, coordTopic);
264 props.put(AppType.COORDINATOR_ACTION, coordTopic);
265
266 String bundleTopic = topicMap.get(JobType.BUNDLE.value);
267 bundleTopic = (bundleTopic != null) ? bundleTopic : defaultTopicName;
268 props.put(AppType.BUNDLE_JOB, bundleTopic);
269 props.put(AppType.BUNDLE_ACTION, bundleTopic);
270
271 return props;
272 }
273
274 public String getTopicPrefix() {
275 return topicPrefix;
276 }
277
278 @Override
279 public void destroy() {
280 topicMap.clear();
281
282 }
283
284 @Override
285 public Class<? extends Service> getInterface() {
286 return JMSTopicService.class;
287 }
288
289 }