001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import org.apache.oozie.client.OozieClient;
021    import org.apache.oozie.WorkflowActionBean;
022    import org.apache.oozie.WorkflowJobBean;
023    import org.apache.oozie.command.CommandException;
024    import org.apache.oozie.command.PreconditionException;
025    import org.apache.oozie.service.Services;
026    import org.apache.oozie.util.LogUtils;
027    import org.apache.oozie.util.ParamChecker;
028    import org.apache.oozie.util.XLog;
029    
030    import java.io.IOException;
031    import java.net.HttpURLConnection;
032    import java.net.URL;
033    
034    public class NotificationXCommand extends WorkflowXCommand<Void> {
035    
036        public static final String NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY = "oozie.notification.url.connection.timeout";
037        public static final int NOTIFICATION_URL_CONNECTION_TIMEOUT_DEFAULT = 10 * 1000; // 10 seconds
038    
039        private static final String STATUS_PATTERN = "\\$status";
040        private static final String JOB_ID_PATTERN = "\\$jobId";
041        private static final String NODE_NAME_PATTERN = "\\$nodeName";
042    
043        private String url;
044    
045        //this variable is package private only for test purposes
046        int retries = 0;
047    
048        public NotificationXCommand(WorkflowJobBean workflow) {
049            super("job.notification", "job.notification", 0);
050            ParamChecker.notNull(workflow, "workflow");
051            LogUtils.setLogInfo(workflow, logInfo);
052            url = workflow.getWorkflowInstance().getConf().get(OozieClient.WORKFLOW_NOTIFICATION_URL);
053            if (url != null) {
054                url = url.replaceAll(JOB_ID_PATTERN, workflow.getId());
055                url = url.replaceAll(STATUS_PATTERN, workflow.getStatus().toString());
056            }
057        }
058    
059        public NotificationXCommand(WorkflowJobBean workflow, WorkflowActionBean action) {
060            super("action.notification", "job.notification", 0);
061            ParamChecker.notNull(workflow, "workflow");
062            ParamChecker.notNull(action, "action");
063            LogUtils.setLogInfo(workflow, logInfo);
064            LogUtils.setLogInfo(action, logInfo);
065            url = workflow.getWorkflowInstance().getConf().get(OozieClient.ACTION_NOTIFICATION_URL);
066            if (url != null) {
067                url = url.replaceAll(JOB_ID_PATTERN, workflow.getId());
068                url = url.replaceAll(NODE_NAME_PATTERN, action.getName());
069                if (action.isComplete()) {
070                    url = url.replaceAll(STATUS_PATTERN, "T:" + action.getTransition());
071                }
072                else {
073                    url = url.replaceAll(STATUS_PATTERN, "S:" + action.getStatus().toString());
074                }
075            }
076        }
077    
078        @Override
079        protected boolean isLockRequired() {
080            return false;
081        }
082    
083        @Override
084        public String getEntityKey() {
085            return url;
086        }
087    
088        @Override
089        protected void loadState() throws CommandException {
090        }
091    
092        @Override
093        protected void verifyPrecondition() throws CommandException, PreconditionException {
094        }
095    
096        @Override
097        protected Void execute() throws CommandException {
098            //if command is requeue, the logInfo has to set to thread local Info object again
099            LogUtils.setLogInfo(logInfo);
100            if (url != null) {
101                int timeout = Services.get().getConf().getInt(NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY,
102                                                              NOTIFICATION_URL_CONNECTION_TIMEOUT_DEFAULT);
103                try {
104                    URL url = new URL(this.url);
105                    HttpURLConnection urlConn = (HttpURLConnection) url.openConnection();
106                    urlConn.setConnectTimeout(timeout);
107                    urlConn.setReadTimeout(timeout);
108                    if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) {
109                        handleRetry();
110                    }
111                }
112                catch (IOException ex) {
113                    handleRetry();
114                }
115            }
116            return null;
117        }
118    
119        private void handleRetry() {
120            if (retries < 3) {
121                retries++;
122                this.resetUsed();
123                queue(this, 60 * 1000);
124            }
125            else {
126                LOG.warn(XLog.OPS, "could not send notification [{0}]", url);
127            }
128        }
129    
130        public String getUrl() {
131            return url;
132        }
133    
134    }