001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import org.apache.oozie.client.OozieClient;
021    import org.apache.oozie.WorkflowActionBean;
022    import org.apache.oozie.WorkflowJobBean;
023    import org.apache.oozie.command.Command;
024    import org.apache.oozie.store.WorkflowStore;
025    import org.apache.oozie.store.Store;
026    import org.apache.oozie.util.XLog;
027    
028    import java.io.IOException;
029    import java.net.HttpURLConnection;
030    import java.net.URL;
031    
032    public class NotificationCommand extends WorkflowCommand<Void> {
033    
034        private static final String STATUS_PATTERN = "\\$status";
035        private static final String JOB_ID_PATTERN = "\\$jobId";
036        private static final String NODE_NAME_PATTERN = "\\$nodeName";
037    
038        private String url;
039        private int retries = 0;
040    
041        public NotificationCommand(WorkflowJobBean workflow) {
042            super("job.notification", "job.notification", 0, XLog.STD, false);
043            url = workflow.getWorkflowInstance().getConf().get(OozieClient.WORKFLOW_NOTIFICATION_URL);
044            if (url != null) {
045                url = url.replaceAll(JOB_ID_PATTERN, workflow.getId());
046                url = url.replaceAll(STATUS_PATTERN, workflow.getStatus().toString());
047            }
048        }
049    
050        public NotificationCommand(WorkflowJobBean workflow, WorkflowActionBean action) {
051            super("action.notification", "job.notification", 0, XLog.STD);
052            url = workflow.getWorkflowInstance().getConf().get(OozieClient.ACTION_NOTIFICATION_URL);
053            if (url != null) {
054                url = url.replaceAll(JOB_ID_PATTERN, workflow.getId());
055                url = url.replaceAll(NODE_NAME_PATTERN, action.getName());
056                if (action.isComplete()) {
057                    url = url.replaceAll(STATUS_PATTERN, "T:" + action.getTransition());
058                }
059                else {
060                    url = url.replaceAll(STATUS_PATTERN, "S:" + action.getStatus().toString());
061                }
062            }
063        }
064    
065        public Void call(WorkflowStore store) {
066            if (url != null) {
067                try {
068                    URL url = new URL(this.url);
069                    HttpURLConnection urlConn = (HttpURLConnection) url.openConnection();
070                    if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) {
071                        handleRetry();
072                    }
073                }
074                catch (IOException ex) {
075                    handleRetry();
076                }
077            }
078            return null;
079        }
080    
081        private void handleRetry() {
082            if (retries < 3) {
083                retries++;
084                queueCallable(this, 60 * 1000);
085            }
086            else {
087                XLog.getLog(getClass()).warn(XLog.OPS, "could not send notification [{0}]", url);
088            }
089        }
090    
091    }