001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.net.HttpURLConnection;
023    import java.net.URL;
024    
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.oozie.CoordinatorActionBean;
027    import org.apache.oozie.ErrorCode;
028    import org.apache.oozie.client.OozieClient;
029    import org.apache.oozie.command.CommandException;
030    import org.apache.oozie.store.CoordinatorStore;
031    import org.apache.oozie.store.StoreException;
032    import org.apache.oozie.util.XConfiguration;
033    import org.apache.oozie.util.XLog;
034    
035    public class CoordActionNotification extends CoordinatorCommand<Void> {
036    
037        private CoordinatorActionBean actionBean;
038        private static final String STATUS_PATTERN = "\\$status";
039        private static final String ACTION_ID_PATTERN = "\\$actionId";
040    
041        private int retries = 0;
042        private final XLog log = XLog.getLog(getClass());
043    
044        public CoordActionNotification(CoordinatorActionBean actionBean) {
045            super("coord_action_notification", "coord_action_notification", 0,
046                    XLog.STD);
047            this.actionBean = actionBean;
048        }
049    
050        @Override
051        protected Void call(CoordinatorStore store) throws StoreException,
052                CommandException {
053            setLogInfo(actionBean);
054            log.info("STARTED Coordinator Notification actionId="
055                    + actionBean.getId() + " : " + actionBean.getStatus());
056            Configuration conf;
057            try {
058                conf = new XConfiguration(new StringReader(actionBean.getRunConf()));
059            }
060            catch (IOException e1) {
061                log.warn("Configuration parse error. read from DB :"
062                        + actionBean.getRunConf());
063                throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1);
064            }
065            String url = conf.get(OozieClient.COORD_ACTION_NOTIFICATION_URL);
066            if (url != null) {
067                url = url.replaceAll(ACTION_ID_PATTERN, actionBean.getId());
068                url = url.replaceAll(STATUS_PATTERN, actionBean.getStatus()
069                        .toString());
070                log.debug("Notification URL :" + url);
071                try {
072                    URL urlObj = new URL(url);
073                    HttpURLConnection urlConn = (HttpURLConnection) urlObj
074                            .openConnection();
075                    if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) {
076                        handleRetry(url);
077                    }
078                }
079                catch (IOException ex) {
080                    handleRetry(url);
081                }
082            }
083            else {
084                log
085                        .info("No Notification URL is defined. Therefore nothing to notify for job "
086                                + actionBean.getJobId()
087                                + " action ID "
088                                + actionBean.getId());
089                // System.out.println("No Notification URL is defined. Therefore nothing is notified");
090            }
091            log.info("ENDED Coordinator Notification actionId="
092                    + actionBean.getId());
093            return null;
094        }
095    
096        private void handleRetry(String url) {
097            if (retries < 3) {
098                retries++;
099                queueCallable(this, 60 * 1000);
100            }
101            else {
102                XLog.getLog(getClass()).warn(XLog.OPS,
103                                             "could not send notification [{0}]", url);
104            }
105        }
106    
107    }