001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import org.apache.oozie.client.OozieClient; 021 import org.apache.oozie.WorkflowActionBean; 022 import org.apache.oozie.WorkflowJobBean; 023 import org.apache.oozie.command.CommandException; 024 import org.apache.oozie.command.PreconditionException; 025 import org.apache.oozie.service.Services; 026 import org.apache.oozie.util.LogUtils; 027 import org.apache.oozie.util.ParamChecker; 028 import org.apache.oozie.util.XLog; 029 030 import java.io.IOException; 031 import java.net.HttpURLConnection; 032 import java.net.URL; 033 034 public class NotificationXCommand extends WorkflowXCommand<Void> { 035 036 public static final String NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY = "oozie.notification.url.connection.timeout"; 037 public static final int NOTIFICATION_URL_CONNECTION_TIMEOUT_DEFAULT = 10 * 1000; // 10 seconds 038 039 private static final String STATUS_PATTERN = "\\$status"; 040 private static final String JOB_ID_PATTERN = "\\$jobId"; 041 private static final String NODE_NAME_PATTERN = "\\$nodeName"; 042 043 private String url; 044 045 //this variable is package private only for test purposes 046 int retries = 0; 047 048 public NotificationXCommand(WorkflowJobBean workflow) { 049 super("job.notification", "job.notification", 0); 050 ParamChecker.notNull(workflow, "workflow"); 051 LogUtils.setLogInfo(workflow, logInfo); 052 url = workflow.getWorkflowInstance().getConf().get(OozieClient.WORKFLOW_NOTIFICATION_URL); 053 if (url != null) { 054 url = url.replaceAll(JOB_ID_PATTERN, workflow.getId()); 055 url = url.replaceAll(STATUS_PATTERN, workflow.getStatus().toString()); 056 } 057 } 058 059 public NotificationXCommand(WorkflowJobBean workflow, WorkflowActionBean action) { 060 super("action.notification", "job.notification", 0); 061 ParamChecker.notNull(workflow, "workflow"); 062 ParamChecker.notNull(action, "action"); 063 LogUtils.setLogInfo(workflow, logInfo); 064 LogUtils.setLogInfo(action, logInfo); 065 url = workflow.getWorkflowInstance().getConf().get(OozieClient.ACTION_NOTIFICATION_URL); 066 if (url != null) { 067 url = url.replaceAll(JOB_ID_PATTERN, workflow.getId()); 068 url = url.replaceAll(NODE_NAME_PATTERN, action.getName()); 069 if (action.isComplete()) { 070 url = url.replaceAll(STATUS_PATTERN, "T:" + action.getTransition()); 071 } 072 else { 073 url = url.replaceAll(STATUS_PATTERN, "S:" + action.getStatus().toString()); 074 } 075 } 076 } 077 078 @Override 079 protected boolean isLockRequired() { 080 return false; 081 } 082 083 @Override 084 public String getEntityKey() { 085 return url; 086 } 087 088 @Override 089 protected void loadState() throws CommandException { 090 } 091 092 @Override 093 protected void verifyPrecondition() throws CommandException, PreconditionException { 094 } 095 096 @Override 097 protected Void execute() throws CommandException { 098 //if command is requeue, the logInfo has to set to thread local Info object again 099 LogUtils.setLogInfo(logInfo); 100 if (url != null) { 101 int timeout = Services.get().getConf().getInt(NOTIFICATION_URL_CONNECTION_TIMEOUT_KEY, 102 NOTIFICATION_URL_CONNECTION_TIMEOUT_DEFAULT); 103 try { 104 URL url = new URL(this.url); 105 HttpURLConnection urlConn = (HttpURLConnection) url.openConnection(); 106 urlConn.setConnectTimeout(timeout); 107 urlConn.setReadTimeout(timeout); 108 if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) { 109 handleRetry(); 110 } 111 } 112 catch (IOException ex) { 113 handleRetry(); 114 } 115 } 116 return null; 117 } 118 119 private void handleRetry() { 120 if (retries < 3) { 121 retries++; 122 this.resetUsed(); 123 queue(this, 60 * 1000); 124 } 125 else { 126 LOG.warn(XLog.OPS, "could not send notification [{0}]", url); 127 } 128 } 129 130 public String getUrl() { 131 return url; 132 } 133 134 }