001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.io.File;
021import java.io.FileInputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.net.URI;
025import java.net.URISyntaxException;
026import java.util.ArrayList;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FSDataInputStream;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.oozie.ErrorCode;
038import org.apache.oozie.dependency.hcat.HCatMessageHandler;
039import org.apache.oozie.jms.JMSConnectionInfo;
040import org.apache.oozie.util.HCatURI;
041import org.apache.oozie.util.MappingRule;
042import org.apache.oozie.util.XConfiguration;
043import org.apache.oozie.util.XLog;
044
045public class HCatAccessorService implements Service {
046
047    public static final String CONF_PREFIX = Service.CONF_PREFIX + "HCatAccessorService.";
048    public static final String JMS_CONNECTIONS_PROPERTIES = CONF_PREFIX + "jmsconnections";
049    public static final String HCAT_CONFIGURATION = CONF_PREFIX + "hcat.configuration";
050
051    private static XLog LOG;
052    private static String DELIMITER = "#";
053    private Configuration conf;
054    private JMSAccessorService jmsService;
055    private List<MappingRule> mappingRules;
056    private JMSConnectionInfo defaultJMSConnInfo;
057    private Configuration hcatConf;
058    /**
059     * Map of publisher(host:port) to JMS connection info
060     */
061    private Map<String, JMSConnectionInfo> publisherJMSConnInfoMap;
062    /**
063     * List of non publishers(host:port)
064     */
065    private Set<String> nonJMSPublishers;
066    /**
067     * Mapping of table to the topic name for the table
068     */
069    private Map<String, String> registeredTopicsMap;
070
071    @Override
072    public void init(Services services) throws ServiceException {
073        LOG = XLog.getLog(getClass());
074        conf = services.getConf();
075        this.jmsService = services.get(JMSAccessorService.class);
076        initializeMappingRules();
077        this.nonJMSPublishers = new HashSet<String>();
078        this.publisherJMSConnInfoMap = new HashMap<String, JMSConnectionInfo>();
079        this.registeredTopicsMap = new HashMap<String, String>();
080        try {
081            loadHCatConf(services);
082        } catch(IOException ioe) {
083            throw new ServiceException(ErrorCode.E0100, HCatAccessorService.class.getName(), "An exception occured while attempting"
084                    + "to load the HCat Configuration", ioe);
085        }
086    }
087
088    private void loadHCatConf(Services services) throws IOException {
089        String path = conf.get(HCAT_CONFIGURATION);
090        if (path != null) {
091            if (path.startsWith("hdfs")) {
092                Path p = new Path(path);
093                HadoopAccessorService has = services.get(HadoopAccessorService.class);
094                try {
095                    FileSystem fs = has.createFileSystem(
096                            System.getProperty("user.name"), p.toUri(), has.createJobConf(p.toUri().getAuthority()));
097                    if (fs.exists(p)) {
098                        FSDataInputStream is = null;
099                        try {
100                            is = fs.open(p);
101                            hcatConf = new XConfiguration(is);
102                        } finally {
103                            if (is != null) {
104                                is.close();
105                            }
106                        }
107                        LOG.info("Loaded HCat Configuration: " + path);
108                    } else {
109                        LOG.warn("HCat Configuration could not be found at [" + path + "]");
110                    }
111                } catch (HadoopAccessorException hae) {
112                    throw new IOException(hae);
113                }
114            } else {
115                File f = new File(path);
116                if (f.exists()) {
117                    InputStream is = null;
118                    try {
119                        is = new FileInputStream(f);
120                        hcatConf = new XConfiguration(is);
121                    } finally {
122                        if (is != null) {
123                            is.close();
124                        }
125                    }
126                    LOG.info("Loaded HCat Configuration: " + path);
127                } else {
128                    LOG.warn("HCat Configuration could not be found at [" + path + "]");
129                }
130            }
131        }
132        else {
133            LOG.info("HCat Configuration not specified");
134        }
135    }
136
137    public Configuration getHCatConf() {
138        return hcatConf;
139    }
140
141    private void initializeMappingRules() {
142        String[] connections = conf.getStrings(JMS_CONNECTIONS_PROPERTIES);
143        if (connections != null) {
144            mappingRules = new ArrayList<MappingRule>(connections.length);
145            for (String connection : connections) {
146                String[] values = connection.split("=", 2);
147                String key = values[0].trim();
148                String value = values[1].trim();
149                if (key.equals("default")) {
150                    defaultJMSConnInfo = new JMSConnectionInfo(value);
151                }
152                else {
153                    mappingRules.add(new MappingRule(key, value));
154                }
155            }
156        }
157        else {
158            LOG.warn("No JMS connection defined");
159        }
160    }
161
162    /**
163     * Determine whether a given source URI publishes JMS messages
164     *
165     * @param sourceURI URI of the publisher
166     * @return true if we have JMS connection information for the source URI, else false
167     */
168    public boolean isKnownPublisher(URI sourceURI) {
169        if (nonJMSPublishers.contains(sourceURI.getAuthority())) {
170            return true;
171        }
172        else {
173            JMSConnectionInfo connInfo = publisherJMSConnInfoMap.get(sourceURI.getAuthority());
174            return connInfo == null ? (getJMSConnectionInfo(sourceURI) != null) : true;
175        }
176    }
177
178    /**
179     * Given a publisher host:port return the connection details of JMS server that the publisher
180     * publishes to
181     *
182     * @param publisherURI URI of the publisher
183     * @return JMSConnectionInfo to connect to the JMS server that the publisher publishes to
184     */
185    public JMSConnectionInfo getJMSConnectionInfo(URI publisherURI) {
186        String publisherAuthority = publisherURI.getAuthority();
187        JMSConnectionInfo connInfo = null;
188        if (publisherJMSConnInfoMap.containsKey(publisherAuthority)) {
189            connInfo = publisherJMSConnInfoMap.get(publisherAuthority);
190        }
191        else {
192            String schemeWithAuthority = publisherURI.getScheme() + "://" + publisherAuthority;
193            for (MappingRule mr : mappingRules) {
194                String jndiPropertiesString = mr.applyRule(schemeWithAuthority);
195                if (jndiPropertiesString != null) {
196                    connInfo = new JMSConnectionInfo(jndiPropertiesString);
197                    publisherJMSConnInfoMap.put(publisherAuthority, connInfo);
198                    LOG.info("Adding hcat server [{0}] to the list of JMS publishers", schemeWithAuthority);
199                    break;
200                }
201            }
202            if (connInfo == null && defaultJMSConnInfo != null) {
203                connInfo = defaultJMSConnInfo;
204                publisherJMSConnInfoMap.put(publisherAuthority, defaultJMSConnInfo);
205                LOG.info("Adding hcat server [{0}] to the list of JMS publishers", schemeWithAuthority);
206            }
207            else {
208                nonJMSPublishers.add(publisherAuthority);
209                LOG.info("Adding hcat server [{0}] to the list of non JMS publishers", schemeWithAuthority);
210            }
211
212        }
213        return connInfo;
214    }
215
216    /**
217     * Check if we are already listening to the JMS topic for the table in the given hcatURI
218     *
219     * @param hcatURI hcatalog partition URI
220     * @return true if registered to a JMS topic for the table in the given hcatURI
221     */
222    public boolean isRegisteredForNotification(HCatURI hcatURI) {
223        return registeredTopicsMap.containsKey(getKeyForRegisteredTopicsMap(hcatURI));
224    }
225
226    /**
227     * Register for notifications on a JMS topic for the specified hcatalog table.
228     *
229     * @param hcatURI hcatalog partition URI
230     * @param topic JMS topic to register to
231     * @param msgHandler Handler which will process the messages received on the topic
232     */
233    public void registerForNotification(HCatURI hcatURI, String topic, HCatMessageHandler msgHandler) {
234        JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI());
235        jmsService.registerForNotification(connInfo, topic, msgHandler);
236        registeredTopicsMap.put(
237                getKeyForRegisteredTopicsMap(hcatURI), topic);
238    }
239
240    public void unregisterFromNotification(HCatURI hcatURI) {
241        String topic = registeredTopicsMap.remove(getKeyForRegisteredTopicsMap(hcatURI));
242        if (topic != null) {
243            JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI());
244            jmsService.unregisterFromNotification(connInfo, topic);
245        }
246    }
247
248    public void unregisterFromNotification(String server, String database, String table) {
249        String key = server + DELIMITER + database + DELIMITER + table;
250        String topic = registeredTopicsMap.remove(key);
251        if (topic != null) {
252            try {
253                JMSConnectionInfo connInfo = getJMSConnectionInfo(new URI("hcat://" + server));
254                jmsService.unregisterFromNotification(connInfo, topic);
255            }
256            catch (URISyntaxException e) {
257                LOG.warn("Error unregistering from notification for topic [{0}]. Hcat table=[{1}]", topic, key, e);
258            }
259        }
260    }
261
262    private String getKeyForRegisteredTopicsMap(HCatURI hcatURI) {
263        return hcatURI.getURI().getAuthority() + DELIMITER + hcatURI.getDb()
264                + DELIMITER + hcatURI.getTable();
265    }
266
267    @Override
268    public void destroy() {
269        publisherJMSConnInfoMap.clear();
270    }
271
272    @Override
273    public Class<? extends Service> getInterface() {
274        return HCatAccessorService.class;
275    }
276
277}