001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import java.io.IOException;
021import java.io.StringReader;
022import java.net.HttpURLConnection;
023import java.net.URL;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.oozie.CoordinatorActionBean;
027import org.apache.oozie.ErrorCode;
028import org.apache.oozie.client.OozieClient;
029import org.apache.oozie.command.CommandException;
030import org.apache.oozie.command.PreconditionException;
031import org.apache.oozie.command.wf.NotificationXCommand;
032import org.apache.oozie.service.Services;
033import org.apache.oozie.util.LogUtils;
034import org.apache.oozie.util.ParamChecker;
035import org.apache.oozie.util.XConfiguration;
036import org.apache.oozie.util.XLog;
037
038/**
039 * This class will send the notification for the coordinator action
040 */
041public class CoordActionNotificationXCommand extends CoordinatorXCommand<Void> {
042
043    private final CoordinatorActionBean actionBean;
044    private static final String STATUS_PATTERN = "\\$status";
045    private static final String ACTION_ID_PATTERN = "\\$actionId";
046
047    //this variable is package private only for test purposes
048    int retries = 0;
049
050    public CoordActionNotificationXCommand(CoordinatorActionBean actionBean) {
051        super("coord_action_notification", "coord_action_notification", 0);
052        ParamChecker.notNull(actionBean, "Action Bean");
053        this.actionBean = actionBean;
054    }
055
056    /* (non-Javadoc)
057     * @see org.apache.oozie.command.XCommand#execute()
058     */
059    @Override
060    protected Void execute() throws CommandException {
061        LOG.info("STARTED Coordinator Notification actionId=" + actionBean.getId() + " : " + actionBean.getStatus());
062        Configuration conf;
063        try {
064            conf = new XConfiguration(new StringReader(actionBean.getRunConf()));
065        }
066        catch (IOException e1) {
067            LOG.warn("Configuration parse error. read from DB :" + actionBean.getRunConf());
068            throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1);
069        }
070        String url = conf.get(OozieClient.COORD_ACTION_NOTIFICATION_URL);
071        if (url != null) {
072            url = url.replaceAll(ACTION_ID_PATTERN, actionBean.getId());
073            url = url.replaceAll(STATUS_PATTERN, actionBean.getStatus().toString());
074            LOG.debug("Notification URL :" + url);
075            try {
076                int timeout = Services.get().getConf().getInt(
077                    NotificationXCommand.NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY,
078                    NotificationXCommand.NOTIFICATION_URL_CONNECTION_TIMEOUT_DEFAULT);
079                URL urlObj = new URL(url);
080                HttpURLConnection urlConn = (HttpURLConnection) urlObj.openConnection();
081                urlConn.setConnectTimeout(timeout);
082                urlConn.setReadTimeout(timeout);
083                if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) {
084                    handleRetry(url);
085                }
086            }
087            catch (IOException ex) {
088                handleRetry(url);
089            }
090        }
091        else {
092            LOG.info("No Notification URL is defined. Therefore nothing to notify for job " + actionBean.getJobId()
093                    + " action ID " + actionBean.getId());
094        }
095        LOG.info("ENDED Coordinator Notification actionId=" + actionBean.getId());
096        return null;
097    }
098
099    /**
100     * This method handles the retry for the coordinator action.
101     *
102     * @param url This is the URL where the notification has to be sent.
103     */
104    private void handleRetry(String url) {
105        if (retries < 3) {
106            retries++;
107            this.resetUsed();
108            queue(this, 60 * 1000);
109        }
110        else {
111            LOG.warn(XLog.OPS, "could not send notification [{0}]", url);
112        }
113    }
114
115    /* (non-Javadoc)
116     * @see org.apache.oozie.command.XCommand#getEntityKey()
117     */
118    @Override
119    public String getEntityKey() {
120        return actionBean.getId();
121    }
122
123    /* (non-Javadoc)
124     * @see org.apache.oozie.command.XCommand#isLockRequired()
125     */
126    @Override
127    protected boolean isLockRequired() {
128        return false;
129    }
130
131    /* (non-Javadoc)
132     * @see org.apache.oozie.command.XCommand#loadState()
133     */
134    @Override
135    protected void loadState() throws CommandException {
136        LogUtils.setLogInfo(actionBean, logInfo);
137    }
138
139    /* (non-Javadoc)
140     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
141     */
142    @Override
143    protected void verifyPrecondition() throws CommandException, PreconditionException {
144    }
145}