001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import org.apache.oozie.client.OozieClient;
021import org.apache.oozie.WorkflowActionBean;
022import org.apache.oozie.WorkflowJobBean;
023import org.apache.oozie.command.CommandException;
024import org.apache.oozie.command.PreconditionException;
025import org.apache.oozie.service.Services;
026import org.apache.oozie.util.LogUtils;
027import org.apache.oozie.util.ParamChecker;
028import org.apache.oozie.util.XLog;
029
030import java.io.IOException;
031import java.net.HttpURLConnection;
032import java.net.URL;
033
034public class NotificationXCommand extends WorkflowXCommand<Void> {
035
036    public static final String NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY = "oozie.notification.url.connection.timeout";
037    public static final int NOTIFICATION_URL_CONNECTION_TIMEOUT_DEFAULT = 10 * 1000; // 10 seconds
038
039    private static final String STATUS_PATTERN = "\\$status";
040    private static final String JOB_ID_PATTERN = "\\$jobId";
041    private static final String NODE_NAME_PATTERN = "\\$nodeName";
042
043    private String url;
044
045    //this variable is package private only for test purposes
046    int retries = 0;
047
048    public NotificationXCommand(WorkflowJobBean workflow) {
049        super("job.notification", "job.notification", 0);
050        ParamChecker.notNull(workflow, "workflow");
051        LogUtils.setLogInfo(workflow, logInfo);
052        url = workflow.getWorkflowInstance().getConf().get(OozieClient.WORKFLOW_NOTIFICATION_URL);
053        if (url != null) {
054            url = url.replaceAll(JOB_ID_PATTERN, workflow.getId());
055            url = url.replaceAll(STATUS_PATTERN, workflow.getStatus().toString());
056        }
057    }
058
059    public NotificationXCommand(WorkflowJobBean workflow, WorkflowActionBean action) {
060        super("action.notification", "job.notification", 0);
061        ParamChecker.notNull(workflow, "workflow");
062        ParamChecker.notNull(action, "action");
063        LogUtils.setLogInfo(workflow, logInfo);
064        LogUtils.setLogInfo(action, logInfo);
065        url = workflow.getWorkflowInstance().getConf().get(OozieClient.ACTION_NOTIFICATION_URL);
066        if (url != null) {
067            url = url.replaceAll(JOB_ID_PATTERN, workflow.getId());
068            url = url.replaceAll(NODE_NAME_PATTERN, action.getName());
069            if (action.isComplete()) {
070                url = url.replaceAll(STATUS_PATTERN, "T:" + action.getTransition());
071            }
072            else {
073                url = url.replaceAll(STATUS_PATTERN, "S:" + action.getStatus().toString());
074            }
075        }
076    }
077
078    @Override
079    protected boolean isLockRequired() {
080        return false;
081    }
082
083    @Override
084    public String getEntityKey() {
085        return url;
086    }
087
088    @Override
089    protected void loadState() throws CommandException {
090    }
091
092    @Override
093    protected void verifyPrecondition() throws CommandException, PreconditionException {
094    }
095
096    @Override
097    protected Void execute() throws CommandException {
098        //if command is requeue, the logInfo has to set to thread local Info object again
099        LogUtils.setLogInfo(logInfo);
100        if (url != null) {
101            int timeout = Services.get().getConf().getInt(NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY,
102                                                          NOTIFICATION_URL_CONNECTION_TIMEOUT_DEFAULT);
103            try {
104                URL url = new URL(this.url);
105                HttpURLConnection urlConn = (HttpURLConnection) url.openConnection();
106                urlConn.setConnectTimeout(timeout);
107                urlConn.setReadTimeout(timeout);
108                if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) {
109                    handleRetry();
110                }
111            }
112            catch (IOException ex) {
113                handleRetry();
114            }
115        }
116        return null;
117    }
118
119    private void handleRetry() {
120        if (retries < 3) {
121            retries++;
122            this.resetUsed();
123            queue(this, 60 * 1000);
124        }
125        else {
126            LOG.warn(XLog.OPS, "could not send notification [{0}]", url);
127        }
128    }
129
130    public String getUrl() {
131        return url;
132    }
133
134}