001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import org.apache.oozie.client.OozieClient;
021    import org.apache.oozie.WorkflowActionBean;
022    import org.apache.oozie.WorkflowJobBean;
023    import org.apache.oozie.command.CommandException;
024    import org.apache.oozie.command.PreconditionException;
025    import org.apache.oozie.util.LogUtils;
026    import org.apache.oozie.util.ParamChecker;
027    import org.apache.oozie.util.XLog;
028    
029    import java.io.IOException;
030    import java.net.HttpURLConnection;
031    import java.net.URL;
032    
033    public class NotificationXCommand extends WorkflowXCommand<Void> {
034    
035        private static final String STATUS_PATTERN = "\\$status";
036        private static final String JOB_ID_PATTERN = "\\$jobId";
037        private static final String NODE_NAME_PATTERN = "\\$nodeName";
038    
039        private String url;
040        private int retries = 0;
041    
042        public NotificationXCommand(WorkflowJobBean workflow) {
043            super("job.notification", "job.notification", 0);
044            ParamChecker.notNull(workflow, "workflow");
045            LogUtils.setLogInfo(workflow, logInfo);
046            url = workflow.getWorkflowInstance().getConf().get(OozieClient.WORKFLOW_NOTIFICATION_URL);
047            if (url != null) {
048                url = url.replaceAll(JOB_ID_PATTERN, workflow.getId());
049                url = url.replaceAll(STATUS_PATTERN, workflow.getStatus().toString());
050            }
051        }
052    
053        public NotificationXCommand(WorkflowJobBean workflow, WorkflowActionBean action) {
054            super("action.notification", "job.notification", 0);
055            ParamChecker.notNull(workflow, "workflow");
056            ParamChecker.notNull(action, "action");
057            LogUtils.setLogInfo(workflow, logInfo);
058            LogUtils.setLogInfo(action, logInfo);
059            url = workflow.getWorkflowInstance().getConf().get(OozieClient.ACTION_NOTIFICATION_URL);
060            if (url != null) {
061                url = url.replaceAll(JOB_ID_PATTERN, workflow.getId());
062                url = url.replaceAll(NODE_NAME_PATTERN, action.getName());
063                if (action.isComplete()) {
064                    url = url.replaceAll(STATUS_PATTERN, "T:" + action.getTransition());
065                }
066                else {
067                    url = url.replaceAll(STATUS_PATTERN, "S:" + action.getStatus().toString());
068                }
069            }
070        }
071    
072        @Override
073        protected boolean isLockRequired() {
074            return false;
075        }
076    
077        @Override
078        public String getEntityKey() {
079            return url;
080        }
081    
082        @Override
083        protected void loadState() throws CommandException {
084        }
085    
086        @Override
087        protected void verifyPrecondition() throws CommandException, PreconditionException {
088        }
089    
090        @Override
091        protected Void execute() throws CommandException {
092            //if command is requeue, the logInfo has to set to thread local Info object again
093            LogUtils.setLogInfo(logInfo);
094            if (url != null) {
095                try {
096                    URL url = new URL(this.url);
097                    HttpURLConnection urlConn = (HttpURLConnection) url.openConnection();
098                    if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) {
099                        handleRetry();
100                    }
101                }
102                catch (IOException ex) {
103                    handleRetry();
104                }
105            }
106            return null;
107        }
108    
109        private void handleRetry() {
110            if (retries < 3) {
111                retries++;
112                this.resetUsed();
113                queue(this, 60 * 1000);
114            }
115            else {
116                LOG.warn(XLog.OPS, "could not send notification [{0}]", url);
117            }
118        }
119    
120        public String getUrl() {
121            return url;
122        }
123    
124    }