001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.util.ArrayList; 021 import java.util.HashMap; 022 import java.util.List; 023 import java.util.Map; 024 import java.util.Properties; 025 026 import org.apache.hadoop.conf.Configuration; 027 import org.apache.oozie.ErrorCode; 028 import org.apache.oozie.AppType; 029 import org.apache.oozie.client.event.SLAEvent; 030 import org.apache.oozie.event.WorkflowJobEvent; 031 import org.apache.oozie.executor.jpa.BundleJobGetForUserJPAExecutor; 032 import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserJPAExecutor; 033 import org.apache.oozie.executor.jpa.JPAExecutorException; 034 import org.apache.oozie.executor.jpa.WorkflowJobGetForUserJPAExecutor; 035 import org.apache.oozie.util.XLog; 036 037 038 /** 039 * JMS Topic service to retrieve topic names from events or job id 040 * 041 */ 042 public class JMSTopicService implements Service { 043 044 public static final String CONF_PREFIX = Service.CONF_PREFIX + "JMSTopicService."; 045 public static final String TOPIC_NAME = CONF_PREFIX + "topic.name"; 046 public static final String TOPIC_PREFIX = CONF_PREFIX + "topic.prefix"; 047 private static XLog LOG; 048 private Configuration conf; 049 private final Map<String, String> topicMap = new HashMap<String, String>(); 050 private static final List<String> JOB_TYPE_CONSTANTS = new ArrayList<String>(); 051 private static final List<String> ALLOWED_TOPIC_NAMES = new ArrayList<String>(); 052 private JPAService jpaService = Services.get().get(JPAService.class); 053 private String defaultTopicName = TopicType.USER.value; 054 private String topicPrefix; 055 056 static { 057 ALLOWED_TOPIC_NAMES.add(TopicType.USER.value); 058 ALLOWED_TOPIC_NAMES.add(TopicType.JOBID.value); 059 } 060 061 static { 062 JOB_TYPE_CONSTANTS.add(JobType.WORKFLOW.value); 063 JOB_TYPE_CONSTANTS.add(JobType.COORDINATOR.value); 064 JOB_TYPE_CONSTANTS.add(JobType.BUNDLE.value); 065 } 066 067 public static enum JobType { 068 WORKFLOW("WORKFLOW"), COORDINATOR("COORDINATOR"), BUNDLE("BUNDLE"); 069 private String value; 070 071 JobType(String value) { 072 this.value = value; 073 } 074 075 String getValue() { 076 return value; 077 } 078 079 } 080 081 public static enum TopicType { 082 USER("${username}"), JOBID("${jobId}"); 083 084 private String value; 085 086 TopicType(String value) { 087 this.value = value; 088 } 089 090 String getValue() { 091 return value; 092 } 093 094 } 095 096 @Override 097 public void init(Services services) throws ServiceException { 098 LOG = XLog.getLog(getClass()); 099 conf = services.getConf(); 100 parseTopicConfiguration(); 101 topicPrefix = conf.get(TOPIC_PREFIX, ""); 102 } 103 104 private void parseTopicConfiguration() throws ServiceException { 105 String topicName = conf.get(TOPIC_NAME, "default=" + TopicType.USER.value); 106 if (topicName == null) { 107 throw new ServiceException(ErrorCode.E0100, getClass().getName(), "JMS topic cannot be null "); 108 } 109 LOG.info("Topic Name is [{0}]", topicName); 110 String[] topic = topicName.trim().split(","); 111 for (int i = 0; i < topic.length; i++) { 112 String[] split = topic[i].trim().split("="); 113 if (split.length == 2) { 114 split[0] = split[0].trim(); 115 split[1] = split[1].trim(); 116 if (split[0].equals("default")) { 117 if (!ALLOWED_TOPIC_NAMES.contains(split[1])) { 118 throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Topic name " + split[1] 119 + " not allowed in default; allowed" + "topics are " + ALLOWED_TOPIC_NAMES); 120 } 121 defaultTopicName = split[1]; 122 } 123 else if (!JOB_TYPE_CONSTANTS.contains(split[0])) { 124 throw new ServiceException(ErrorCode.E0100, getClass().getName(), 125 "Incorrect job type for defining JMS topic: " + split[0] + " ;" + "allowed job types are " 126 + JOB_TYPE_CONSTANTS); 127 } 128 else if (!ALLOWED_TOPIC_NAMES.contains(split[1]) && split[1].contains("$")) { 129 throw new ServiceException(ErrorCode.E0100, getClass().getName(), "JMS topic value " + split[1] 130 + " " + "for a job type is incorrect " + "Correct values are " + ALLOWED_TOPIC_NAMES); 131 } 132 else { 133 topicMap.put(split[0], split[1]); 134 } 135 } 136 else { 137 throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Property " + topic[i] 138 + "has incorrect syntax; It should be specified as key value pair"); 139 } 140 } 141 } 142 143 /** 144 * Retrieve topic from Job id 145 * @param jobId 146 * @return 147 * @throws JPAExecutorException 148 */ 149 public String getTopic(String jobId) throws JPAExecutorException { 150 String topicName = null; 151 if (jobId.contains("-W@")) { 152 jobId = jobId.substring(0, jobId.indexOf('@')); 153 topicName = getTopicForWorkflow(jobId); 154 } 155 else if (jobId.endsWith("-W")) { 156 topicName = getTopicForWorkflow(jobId); 157 } 158 else if (jobId.contains("-C@")) { 159 jobId = jobId.substring(0, jobId.indexOf('@')); 160 topicName = getTopicForCoordinator(jobId); 161 } 162 else if (jobId.endsWith("C")) { 163 topicName = getTopicForCoordinator(jobId); 164 } 165 else if (jobId.contains("-B_")) { 166 jobId = jobId.substring(0, jobId.indexOf('_')); 167 topicName = getTopicForBundle(jobId); 168 } 169 170 else if (jobId.endsWith("B")) { 171 topicName = getTopicForBundle(jobId); 172 } 173 return topicPrefix + topicName; 174 } 175 176 /** 177 * Retrieve Topic 178 * 179 * @param appType 180 * @param user 181 * @param jobId 182 * @param parentJobId 183 * @return topicName 184 */ 185 186 public String getTopic(AppType appType, String user, String jobId, String parentJobId) { 187 String topicName = null; 188 String id = jobId; 189 if (appType == AppType.COORDINATOR_JOB || appType == AppType.COORDINATOR_ACTION) { 190 topicName = topicMap.get(JobType.COORDINATOR.value); 191 if (appType == AppType.COORDINATOR_ACTION) { 192 id = parentJobId; 193 } 194 } 195 else if (appType == AppType.WORKFLOW_JOB || appType == AppType.WORKFLOW_ACTION) { 196 topicName = topicMap.get(JobType.WORKFLOW); 197 } 198 199 if (topicName == null) { 200 if (defaultTopicName.equals(TopicType.USER.value)) { 201 topicName = user; 202 } 203 else if (defaultTopicName.equals(TopicType.JOBID.value)) { 204 topicName = id; 205 } 206 } 207 return topicPrefix + topicName; 208 } 209 210 private String getTopicForWorkflow(String jobId) throws JPAExecutorException { 211 String topicName = topicMap.get(JobType.WORKFLOW.value); 212 if (topicName == null) { 213 topicName = defaultTopicName; 214 } 215 if (topicName.equals(TopicType.USER.value)) { 216 topicName = jpaService.execute(new WorkflowJobGetForUserJPAExecutor(jobId)); 217 } 218 else if (topicName.equals(TopicType.JOBID.value)) { 219 topicName = jobId; 220 } 221 return topicName; 222 223 } 224 225 private String getTopicForCoordinator(String jobId) throws JPAExecutorException { 226 String topicName = topicMap.get(JobType.COORDINATOR.value); 227 228 if (topicName == null) { 229 topicName = defaultTopicName; 230 } 231 if (topicName.equals(TopicType.USER.value)) { 232 topicName = jpaService.execute(new CoordinatorJobGetForUserJPAExecutor(jobId)); 233 } 234 else if (topicName.equals(TopicType.JOBID.value)) { 235 topicName = jobId; 236 } 237 return topicName; 238 } 239 240 private String getTopicForBundle(String jobId) throws JPAExecutorException { 241 String topicName = topicMap.get(JobType.BUNDLE.value); 242 if (topicName == null) { 243 topicName = defaultTopicName; 244 } 245 if (topicName.equals(TopicType.USER.value)) { 246 topicName = jpaService.execute(new BundleJobGetForUserJPAExecutor(jobId)); 247 } 248 else if (topicName.equals(TopicType.JOBID.value)) { 249 topicName = jobId; 250 } 251 return topicName; 252 } 253 254 public Properties getTopicPatternProperties() { 255 Properties props = new Properties(); 256 String wfTopic = topicMap.get(JobType.WORKFLOW.value); 257 wfTopic = (wfTopic != null) ? wfTopic : defaultTopicName; 258 props.put(AppType.WORKFLOW_JOB, wfTopic); 259 props.put(AppType.WORKFLOW_ACTION, wfTopic); 260 261 String coordTopic = topicMap.get(JobType.COORDINATOR.value); 262 coordTopic = (coordTopic != null) ? coordTopic : defaultTopicName; 263 props.put(AppType.COORDINATOR_JOB, coordTopic); 264 props.put(AppType.COORDINATOR_ACTION, coordTopic); 265 266 String bundleTopic = topicMap.get(JobType.BUNDLE.value); 267 bundleTopic = (bundleTopic != null) ? bundleTopic : defaultTopicName; 268 props.put(AppType.BUNDLE_JOB, bundleTopic); 269 props.put(AppType.BUNDLE_ACTION, bundleTopic); 270 271 return props; 272 } 273 274 public String getTopicPrefix() { 275 return topicPrefix; 276 } 277 278 @Override 279 public void destroy() { 280 topicMap.clear(); 281 282 } 283 284 @Override 285 public Class<? extends Service> getInterface() { 286 return JMSTopicService.class; 287 } 288 289 }