This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.net.HttpURLConnection;
023 import java.net.URL;
024
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.oozie.CoordinatorActionBean;
027 import org.apache.oozie.ErrorCode;
028 import org.apache.oozie.client.OozieClient;
029 import org.apache.oozie.command.CommandException;
030 import org.apache.oozie.command.PreconditionException;
031 import org.apache.oozie.command.wf.NotificationXCommand;
032 import org.apache.oozie.service.Services;
033 import org.apache.oozie.util.LogUtils;
034 import org.apache.oozie.util.ParamChecker;
035 import org.apache.oozie.util.XConfiguration;
036 import org.apache.oozie.util.XLog;
037
038 /**
039 * This class will send the notification for the coordinator action
040 */
041 public class CoordActionNotificationXCommand extends CoordinatorXCommand<Void> {
042
043 private final CoordinatorActionBean actionBean;
044 private static final String STATUS_PATTERN = "\\$status";
045 private static final String ACTION_ID_PATTERN = "\\$actionId";
046
047 //this variable is package private only for test purposes
048 int retries = 0;
049
050 public CoordActionNotificationXCommand(CoordinatorActionBean actionBean) {
051 super("coord_action_notification", "coord_action_notification", 0);
052 ParamChecker.notNull(actionBean, "Action Bean");
053 this.actionBean = actionBean;
054 }
055
056 /* (non-Javadoc)
057 * @see org.apache.oozie.command.XCommand#execute()
058 */
059 @Override
060 protected Void execute() throws CommandException {
061 LOG.info("STARTED Coordinator Notification actionId=" + actionBean.getId() + " : " + actionBean.getStatus());
062 Configuration conf;
063 try {
064 conf = new XConfiguration(new StringReader(actionBean.getRunConf()));
065 }
066 catch (IOException e1) {
067 LOG.warn("Configuration parse error. read from DB :" + actionBean.getRunConf());
068 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1);
069 }
070 String url = conf.get(OozieClient.COORD_ACTION_NOTIFICATION_URL);
071 if (url != null) {
072 url = url.replaceAll(ACTION_ID_PATTERN, actionBean.getId());
073 url = url.replaceAll(STATUS_PATTERN, actionBean.getStatus().toString());
074 LOG.debug("Notification URL :" + url);
075 try {
076 int timeout = Services.get().getConf().getInt(
077 NotificationXCommand.NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY,
078 NotificationXCommand.NOTIFICATION_URL_CONNECTION_TIMEOUT_DEFAULT);
079 URL urlObj = new URL(url);
080 HttpURLConnection urlConn = (HttpURLConnection) urlObj.openConnection();
081 urlConn.setConnectTimeout(timeout);
082 urlConn.setReadTimeout(timeout);
083 if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) {
084 handleRetry(url);
085 }
086 }
087 catch (IOException ex) {
088 handleRetry(url);
089 }
090 }
091 else {
092 LOG.info("No Notification URL is defined. Therefore nothing to notify for job " + actionBean.getJobId()
093 + " action ID " + actionBean.getId());
094 }
095 LOG.info("ENDED Coordinator Notification actionId=" + actionBean.getId());
096 return null;
097 }
098
099 /**
100 * This method handles the retry for the coordinator action.
101 *
102 * @param url This is the URL where the notification has to be sent.
103 */
104 private void handleRetry(String url) {
105 if (retries < 3) {
106 retries++;
107 this.resetUsed();
108 queue(this, 60 * 1000);
109 }
110 else {
111 LOG.warn(XLog.OPS, "could not send notification [{0}]", url);
112 }
113 }
114
115 /* (non-Javadoc)
116 * @see org.apache.oozie.command.XCommand#getEntityKey()
117 */
118 @Override
119 public String getEntityKey() {
120 return actionBean.getId();
121 }
122
123 /* (non-Javadoc)
124 * @see org.apache.oozie.command.XCommand#isLockRequired()
125 */
126 @Override
127 protected boolean isLockRequired() {
128 return false;
129 }
130
131 /* (non-Javadoc)
132 * @see org.apache.oozie.command.XCommand#loadState()
133 */
134 @Override
135 protected void loadState() throws CommandException {
136 LogUtils.setLogInfo(actionBean, logInfo);
137 }
138
139 /* (non-Javadoc)
140 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
141 */
142 @Override
143 protected void verifyPrecondition() throws CommandException, PreconditionException {
144 }
145 }