001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import org.apache.oozie.client.OozieClient; 021 import org.apache.oozie.WorkflowActionBean; 022 import org.apache.oozie.WorkflowJobBean; 023 import org.apache.oozie.command.Command; 024 import org.apache.oozie.store.WorkflowStore; 025 import org.apache.oozie.store.Store; 026 import org.apache.oozie.util.XLog; 027 028 import java.io.IOException; 029 import java.net.HttpURLConnection; 030 import java.net.URL; 031 032 public class NotificationCommand extends WorkflowCommand<Void> { 033 034 private static final String STATUS_PATTERN = "\\$status"; 035 private static final String JOB_ID_PATTERN = "\\$jobId"; 036 private static final String NODE_NAME_PATTERN = "\\$nodeName"; 037 038 private String url; 039 private int retries = 0; 040 041 public NotificationCommand(WorkflowJobBean workflow) { 042 super("job.notification", "job.notification", 0, XLog.STD, false); 043 url = workflow.getWorkflowInstance().getConf().get(OozieClient.WORKFLOW_NOTIFICATION_URL); 044 if (url != null) { 045 url = url.replaceAll(JOB_ID_PATTERN, workflow.getId()); 046 url = url.replaceAll(STATUS_PATTERN, workflow.getStatus().toString()); 047 } 048 } 049 050 public NotificationCommand(WorkflowJobBean workflow, WorkflowActionBean action) { 051 super("action.notification", "job.notification", 0, XLog.STD); 052 url = workflow.getWorkflowInstance().getConf().get(OozieClient.ACTION_NOTIFICATION_URL); 053 if (url != null) { 054 url = url.replaceAll(JOB_ID_PATTERN, workflow.getId()); 055 url = url.replaceAll(NODE_NAME_PATTERN, action.getName()); 056 if (action.isComplete()) { 057 url = url.replaceAll(STATUS_PATTERN, "T:" + action.getTransition()); 058 } 059 else { 060 url = url.replaceAll(STATUS_PATTERN, "S:" + action.getStatus().toString()); 061 } 062 } 063 } 064 065 public Void call(WorkflowStore store) { 066 if (url != null) { 067 try { 068 URL url = new URL(this.url); 069 HttpURLConnection urlConn = (HttpURLConnection) url.openConnection(); 070 if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) { 071 handleRetry(); 072 } 073 } 074 catch (IOException ex) { 075 handleRetry(); 076 } 077 } 078 return null; 079 } 080 081 private void handleRetry() { 082 if (retries < 3) { 083 retries++; 084 queueCallable(this, 60 * 1000); 085 } 086 else { 087 XLog.getLog(getClass()).warn(XLog.OPS, "could not send notification [{0}]", url); 088 } 089 } 090 091 }