This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import org.apache.oozie.client.OozieClient;
021 import org.apache.oozie.WorkflowActionBean;
022 import org.apache.oozie.WorkflowJobBean;
023 import org.apache.oozie.command.CommandException;
024 import org.apache.oozie.command.PreconditionException;
025 import org.apache.oozie.service.Services;
026 import org.apache.oozie.util.LogUtils;
027 import org.apache.oozie.util.ParamChecker;
028 import org.apache.oozie.util.XLog;
029
030 import java.io.IOException;
031 import java.net.HttpURLConnection;
032 import java.net.URL;
033
034 public class NotificationXCommand extends WorkflowXCommand<Void> {
035
036 public static final String NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY = "oozie.notification.url.connection.timeout";
037 public static final int NOTIFICATION_URL_CONNECTION_TIMEOUT_DEFAULT = 10 * 1000; // 10 seconds
038
039 private static final String STATUS_PATTERN = "\\$status";
040 private static final String JOB_ID_PATTERN = "\\$jobId";
041 private static final String NODE_NAME_PATTERN = "\\$nodeName";
042
043 private String url;
044
045 //this variable is package private only for test purposes
046 int retries = 0;
047
048 public NotificationXCommand(WorkflowJobBean workflow) {
049 super("job.notification", "job.notification", 0);
050 ParamChecker.notNull(workflow, "workflow");
051 LogUtils.setLogInfo(workflow, logInfo);
052 url = workflow.getWorkflowInstance().getConf().get(OozieClient.WORKFLOW_NOTIFICATION_URL);
053 if (url != null) {
054 url = url.replaceAll(JOB_ID_PATTERN, workflow.getId());
055 url = url.replaceAll(STATUS_PATTERN, workflow.getStatus().toString());
056 }
057 }
058
059 public NotificationXCommand(WorkflowJobBean workflow, WorkflowActionBean action) {
060 super("action.notification", "job.notification", 0);
061 ParamChecker.notNull(workflow, "workflow");
062 ParamChecker.notNull(action, "action");
063 LogUtils.setLogInfo(workflow, logInfo);
064 LogUtils.setLogInfo(action, logInfo);
065 url = workflow.getWorkflowInstance().getConf().get(OozieClient.ACTION_NOTIFICATION_URL);
066 if (url != null) {
067 url = url.replaceAll(JOB_ID_PATTERN, workflow.getId());
068 url = url.replaceAll(NODE_NAME_PATTERN, action.getName());
069 if (action.isComplete()) {
070 url = url.replaceAll(STATUS_PATTERN, "T:" + action.getTransition());
071 }
072 else {
073 url = url.replaceAll(STATUS_PATTERN, "S:" + action.getStatus().toString());
074 }
075 }
076 }
077
078 @Override
079 protected boolean isLockRequired() {
080 return false;
081 }
082
083 @Override
084 public String getEntityKey() {
085 return url;
086 }
087
088 @Override
089 protected void loadState() throws CommandException {
090 }
091
092 @Override
093 protected void verifyPrecondition() throws CommandException, PreconditionException {
094 }
095
096 @Override
097 protected Void execute() throws CommandException {
098 //if command is requeue, the logInfo has to set to thread local Info object again
099 LogUtils.setLogInfo(logInfo);
100 if (url != null) {
101 int timeout = Services.get().getConf().getInt(NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY,
102 NOTIFICATION_URL_CONNECTION_TIMEOUT_DEFAULT);
103 try {
104 URL url = new URL(this.url);
105 HttpURLConnection urlConn = (HttpURLConnection) url.openConnection();
106 urlConn.setConnectTimeout(timeout);
107 urlConn.setReadTimeout(timeout);
108 if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) {
109 handleRetry();
110 }
111 }
112 catch (IOException ex) {
113 handleRetry();
114 }
115 }
116 return null;
117 }
118
119 private void handleRetry() {
120 if (retries < 3) {
121 retries++;
122 this.resetUsed();
123 queue(this, 60 * 1000);
124 }
125 else {
126 LOG.warn(XLog.OPS, "could not send notification [{0}]", url);
127 }
128 }
129
130 public String getUrl() {
131 return url;
132 }
133
134 }