001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.Properties; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.oozie.ErrorCode; 029import org.apache.oozie.AppType; 030import org.apache.oozie.client.event.SLAEvent; 031import org.apache.oozie.event.WorkflowJobEvent; 032import org.apache.oozie.executor.jpa.BundleJobGetForUserJPAExecutor; 033import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserJPAExecutor; 034import org.apache.oozie.executor.jpa.JPAExecutorException; 035import org.apache.oozie.executor.jpa.WorkflowJobGetForUserJPAExecutor; 036import org.apache.oozie.util.XLog; 037 038 039/** 040 * JMS Topic service to retrieve topic names from events or job id 041 * 042 */ 043public class JMSTopicService implements Service { 044 045 public static final String CONF_PREFIX = Service.CONF_PREFIX + "JMSTopicService."; 046 public static final String TOPIC_NAME = CONF_PREFIX + "topic.name"; 047 public static final String TOPIC_PREFIX = CONF_PREFIX + "topic.prefix"; 048 private static XLog LOG; 049 private Configuration conf; 050 private final Map<String, String> topicMap = new HashMap<String, String>(); 051 private static final List<String> JOB_TYPE_CONSTANTS = new ArrayList<String>(); 052 private static final List<String> ALLOWED_TOPIC_NAMES = new ArrayList<String>(); 053 private JPAService jpaService = Services.get().get(JPAService.class); 054 private String defaultTopicName = TopicType.USER.value; 055 private String topicPrefix; 056 057 static { 058 ALLOWED_TOPIC_NAMES.add(TopicType.USER.value); 059 ALLOWED_TOPIC_NAMES.add(TopicType.JOBID.value); 060 } 061 062 static { 063 JOB_TYPE_CONSTANTS.add(JobType.WORKFLOW.value); 064 JOB_TYPE_CONSTANTS.add(JobType.COORDINATOR.value); 065 JOB_TYPE_CONSTANTS.add(JobType.BUNDLE.value); 066 } 067 068 public static enum JobType { 069 WORKFLOW("WORKFLOW"), COORDINATOR("COORDINATOR"), BUNDLE("BUNDLE"); 070 private String value; 071 072 JobType(String value) { 073 this.value = value; 074 } 075 076 String getValue() { 077 return value; 078 } 079 080 } 081 082 public static enum TopicType { 083 USER("${username}"), JOBID("${jobId}"); 084 085 private String value; 086 087 TopicType(String value) { 088 this.value = value; 089 } 090 091 String getValue() { 092 return value; 093 } 094 095 } 096 097 @Override 098 public void init(Services services) throws ServiceException { 099 LOG = XLog.getLog(getClass()); 100 conf = services.getConf(); 101 parseTopicConfiguration(); 102 topicPrefix = conf.get(TOPIC_PREFIX, ""); 103 } 104 105 private void parseTopicConfiguration() throws ServiceException { 106 String topicName = ConfigurationService.get(conf, TOPIC_NAME); 107 if (topicName == null) { 108 throw new ServiceException(ErrorCode.E0100, getClass().getName(), "JMS topic cannot be null "); 109 } 110 LOG.info("Topic Name is [{0}]", topicName); 111 String[] topic = topicName.trim().split(","); 112 for (int i = 0; i < topic.length; i++) { 113 String[] split = topic[i].trim().split("="); 114 if (split.length == 2) { 115 split[0] = split[0].trim(); 116 split[1] = split[1].trim(); 117 if (split[0].equals("default")) { 118 if (!ALLOWED_TOPIC_NAMES.contains(split[1])) { 119 throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Topic name " + split[1] 120 + " not allowed in default; allowed" + "topics are " + ALLOWED_TOPIC_NAMES); 121 } 122 defaultTopicName = split[1]; 123 } 124 else if (!JOB_TYPE_CONSTANTS.contains(split[0])) { 125 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 126 "Incorrect job type for defining JMS topic: " + split[0] + " ;" + "allowed job types are " 127 + JOB_TYPE_CONSTANTS); 128 } 129 else if (!ALLOWED_TOPIC_NAMES.contains(split[1]) && split[1].contains("$")) { 130 throw new ServiceException(ErrorCode.E0100, getClass().getName(), "JMS topic value " + split[1] 131 + " " + "for a job type is incorrect " + "Correct values are " + ALLOWED_TOPIC_NAMES); 132 } 133 else { 134 topicMap.put(split[0], split[1]); 135 } 136 } 137 else { 138 throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Property " + topic[i] 139 + "has incorrect syntax; It should be specified as key value pair"); 140 } 141 } 142 } 143 144 /** 145 * Retrieve topic from Job id 146 * @param jobId 147 * @return topicPrefix + topicName Retrieve topic from Job id 148 * @throws JPAExecutorException 149 */ 150 public String getTopic(String jobId) throws JPAExecutorException { 151 String topicName = null; 152 if (jobId.contains("-W@")) { 153 jobId = jobId.substring(0, jobId.indexOf('@')); 154 topicName = getTopicForWorkflow(jobId); 155 } 156 else if (jobId.endsWith("-W")) { 157 topicName = getTopicForWorkflow(jobId); 158 } 159 else if (jobId.contains("-C@")) { 160 jobId = jobId.substring(0, jobId.indexOf('@')); 161 topicName = getTopicForCoordinator(jobId); 162 } 163 else if (jobId.endsWith("C")) { 164 topicName = getTopicForCoordinator(jobId); 165 } 166 else if (jobId.contains("-B_")) { 167 jobId = jobId.substring(0, jobId.indexOf('_')); 168 topicName = getTopicForBundle(jobId); 169 } 170 171 else if (jobId.endsWith("B")) { 172 topicName = getTopicForBundle(jobId); 173 } 174 return topicPrefix + topicName; 175 } 176 177 /** 178 * Retrieve Topic 179 * 180 * @param appType 181 * @param user 182 * @param jobId 183 * @param parentJobId 184 * @return topicName 185 */ 186 187 public String getTopic(AppType appType, String user, String jobId, String parentJobId) { 188 String topicName = null; 189 String id = jobId; 190 if (appType == AppType.COORDINATOR_JOB || appType == AppType.COORDINATOR_ACTION) { 191 topicName = topicMap.get(JobType.COORDINATOR.value); 192 if (appType == AppType.COORDINATOR_ACTION) { 193 id = parentJobId; 194 } 195 } 196 else if (appType == AppType.WORKFLOW_JOB || appType == AppType.WORKFLOW_ACTION) { 197 topicName = topicMap.get(JobType.WORKFLOW.value); 198 if (appType == AppType.WORKFLOW_ACTION) { 199 id = parentJobId; 200 } 201 } 202 else if (appType == AppType.BUNDLE_JOB || appType == AppType.BUNDLE_ACTION) { 203 topicName = topicMap.get(JobType.BUNDLE.value); 204 if (appType == AppType.BUNDLE_ACTION) { 205 id = parentJobId; 206 } 207 } 208 209 if (topicName == null) { 210 if (defaultTopicName.equals(TopicType.USER.value)) { 211 topicName = user; 212 } 213 else if (defaultTopicName.equals(TopicType.JOBID.value)) { 214 topicName = id; 215 } 216 } 217 return topicPrefix + topicName; 218 } 219 220 private String getTopicForWorkflow(String jobId) throws JPAExecutorException { 221 String topicName = topicMap.get(JobType.WORKFLOW.value); 222 if (topicName == null) { 223 topicName = defaultTopicName; 224 } 225 if (topicName.equals(TopicType.USER.value)) { 226 topicName = jpaService.execute(new WorkflowJobGetForUserJPAExecutor(jobId)); 227 } 228 else if (topicName.equals(TopicType.JOBID.value)) { 229 topicName = jobId; 230 } 231 return topicName; 232 233 } 234 235 private String getTopicForCoordinator(String jobId) throws JPAExecutorException { 236 String topicName = topicMap.get(JobType.COORDINATOR.value); 237 238 if (topicName == null) { 239 topicName = defaultTopicName; 240 } 241 if (topicName.equals(TopicType.USER.value)) { 242 topicName = jpaService.execute(new CoordinatorJobGetForUserJPAExecutor(jobId)); 243 } 244 else if (topicName.equals(TopicType.JOBID.value)) { 245 topicName = jobId; 246 } 247 return topicName; 248 } 249 250 private String getTopicForBundle(String jobId) throws JPAExecutorException { 251 String topicName = topicMap.get(JobType.BUNDLE.value); 252 if (topicName == null) { 253 topicName = defaultTopicName; 254 } 255 if (topicName.equals(TopicType.USER.value)) { 256 topicName = jpaService.execute(new BundleJobGetForUserJPAExecutor(jobId)); 257 } 258 else if (topicName.equals(TopicType.JOBID.value)) { 259 topicName = jobId; 260 } 261 return topicName; 262 } 263 264 public Properties getTopicPatternProperties() { 265 Properties props = new Properties(); 266 String wfTopic = topicMap.get(JobType.WORKFLOW.value); 267 wfTopic = (wfTopic != null) ? wfTopic : defaultTopicName; 268 props.put(AppType.WORKFLOW_JOB, wfTopic); 269 props.put(AppType.WORKFLOW_ACTION, wfTopic); 270 271 String coordTopic = topicMap.get(JobType.COORDINATOR.value); 272 coordTopic = (coordTopic != null) ? coordTopic : defaultTopicName; 273 props.put(AppType.COORDINATOR_JOB, coordTopic); 274 props.put(AppType.COORDINATOR_ACTION, coordTopic); 275 276 String bundleTopic = topicMap.get(JobType.BUNDLE.value); 277 bundleTopic = (bundleTopic != null) ? bundleTopic : defaultTopicName; 278 props.put(AppType.BUNDLE_JOB, bundleTopic); 279 props.put(AppType.BUNDLE_ACTION, bundleTopic); 280 281 return props; 282 } 283 284 public String getTopicPrefix() { 285 return topicPrefix; 286 } 287 288 @Override 289 public void destroy() { 290 topicMap.clear(); 291 292 } 293 294 @Override 295 public Class<? extends Service> getInterface() { 296 return JMSTopicService.class; 297 } 298 299}