This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.net.HttpURLConnection;
023 import java.net.URL;
024
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.oozie.CoordinatorActionBean;
027 import org.apache.oozie.ErrorCode;
028 import org.apache.oozie.client.OozieClient;
029 import org.apache.oozie.command.CommandException;
030 import org.apache.oozie.store.CoordinatorStore;
031 import org.apache.oozie.store.StoreException;
032 import org.apache.oozie.util.XConfiguration;
033 import org.apache.oozie.util.XLog;
034
035 public class CoordActionNotification extends CoordinatorCommand<Void> {
036
037 private CoordinatorActionBean actionBean;
038 private static final String STATUS_PATTERN = "\\$status";
039 private static final String ACTION_ID_PATTERN = "\\$actionId";
040
041 private int retries = 0;
042 private final XLog log = XLog.getLog(getClass());
043
044 public CoordActionNotification(CoordinatorActionBean actionBean) {
045 super("coord_action_notification", "coord_action_notification", 0,
046 XLog.STD);
047 this.actionBean = actionBean;
048 }
049
050 @Override
051 protected Void call(CoordinatorStore store) throws StoreException,
052 CommandException {
053 setLogInfo(actionBean);
054 log.info("STARTED Coordinator Notification actionId="
055 + actionBean.getId() + " : " + actionBean.getStatus());
056 Configuration conf;
057 try {
058 conf = new XConfiguration(new StringReader(actionBean.getRunConf()));
059 }
060 catch (IOException e1) {
061 log.warn("Configuration parse error. read from DB :"
062 + actionBean.getRunConf());
063 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1);
064 }
065 String url = conf.get(OozieClient.COORD_ACTION_NOTIFICATION_URL);
066 if (url != null) {
067 url = url.replaceAll(ACTION_ID_PATTERN, actionBean.getId());
068 url = url.replaceAll(STATUS_PATTERN, actionBean.getStatus()
069 .toString());
070 log.debug("Notification URL :" + url);
071 try {
072 URL urlObj = new URL(url);
073 HttpURLConnection urlConn = (HttpURLConnection) urlObj
074 .openConnection();
075 if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) {
076 handleRetry(url);
077 }
078 }
079 catch (IOException ex) {
080 handleRetry(url);
081 }
082 }
083 else {
084 log
085 .info("No Notification URL is defined. Therefore nothing to notify for job "
086 + actionBean.getJobId()
087 + " action ID "
088 + actionBean.getId());
089 // System.out.println("No Notification URL is defined. Therefore nothing is notified");
090 }
091 log.info("ENDED Coordinator Notification actionId="
092 + actionBean.getId());
093 return null;
094 }
095
096 private void handleRetry(String url) {
097 if (retries < 3) {
098 retries++;
099 queueCallable(this, 60 * 1000);
100 }
101 else {
102 XLog.getLog(getClass()).warn(XLog.OPS,
103 "could not send notification [{0}]", url);
104 }
105 }
106
107 }