001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.util.graph; 020 021import com.google.common.base.Charsets; 022import com.google.common.base.Preconditions; 023import com.google.common.base.Strings; 024import org.apache.oozie.client.WorkflowAction; 025import org.apache.oozie.client.WorkflowJob; 026import org.apache.oozie.util.Instrumentation; 027import org.xml.sax.Attributes; 028import org.xml.sax.SAXException; 029import org.xml.sax.helpers.DefaultHandler; 030 031import javax.imageio.ImageIO; 032import java.awt.image.BufferedImage; 033import java.io.IOException; 034import java.io.OutputStream; 035import java.util.LinkedHashMap; 036import java.util.Locale; 037import java.util.Map; 038 039public class WorkflowGraphHandler extends DefaultHandler { 040 private OutputStream out; 041 private final OutputFormat outputFormat; 042 private final WorkflowJob job; 043 private boolean showKill; 044 private final GraphRenderer graphRenderer; 045 private final Map<String, WorkflowActionNode> tags = new LinkedHashMap<>(); 046 private final WorkflowParseState state = new WorkflowParseState(); 047 048 WorkflowGraphHandler(final OutputStream out, 049 final OutputFormat outputFormat, 050 final WorkflowJob job, 051 final boolean showKill, 052 final GraphRenderer graphRenderer) { 053 this.out = out; 054 this.job = job; 055 this.showKill = showKill; 056 this.graphRenderer = graphRenderer; 057 this.outputFormat = outputFormat; 058 } 059 060 @Override 061 public void startDocument() throws SAXException { 062 // NOP 063 } 064 065 @Override 066 public void endDocument() throws SAXException { 067 final Instrumentation.Cron cron = new Instrumentation.Cron(); 068 cron.start(); 069 070 if (tags.isEmpty()) { 071 // Nothing to do here! 072 return; 073 } 074 075 final Map<String, WorkflowAction> workflowActions = fillWorkflowActions(); 076 for (final Map.Entry<String, WorkflowActionNode> entry : tags.entrySet()) { 077 final String name = entry.getKey(); 078 final WorkflowActionNode parent = entry.getValue(); 079 if (workflowActions.containsKey(name)) { 080 parent.setStatus(workflowActions.get(name).getStatus()); 081 } 082 083 graphRenderer.addNode(parent); 084 085 for (final Map.Entry<String, Boolean> arc : parent.getArcs().entrySet()) { 086 if (!showKill && arc.getValue() && tags.get(arc.getKey()).getType().equals("kill")) { 087 // Don't show kill node (assumption: only error goes to kill node; 088 // No ok goes to kill node) 089 continue; 090 } 091 092 final WorkflowActionNode child = tags.get(arc.getKey()); 093 if (child != null) { 094 if (workflowActions.containsKey(arc.getKey())) { 095 child.setStatus(workflowActions.get(arc.getKey()).getStatus()); 096 } 097 098 graphRenderer.addEdge(parent, child); 099 } 100 } 101 102 graphRenderer.persist(parent); 103 } 104 105 switch (outputFormat) { 106 case PNG: 107 renderAndWritePng(); 108 break; 109 case DOT: 110 renderAndWriteDot(); 111 break; 112 case SVG: 113 renderAndWriteSvg(); 114 break; 115 default: 116 throw new IllegalArgumentException(String.format("Unknown outputFormat %s", outputFormat)); 117 } 118 } 119 120 private void renderAndWritePng() throws SAXException { 121 final BufferedImage source = graphRenderer.renderPng(); 122 123 try { 124 ImageIO.write(source, "png", out); 125 } catch (final IOException ioe) { 126 throw new SAXException(ioe); 127 } finally { 128 source.flush(); 129 } 130 } 131 132 private void renderAndWriteDot() throws SAXException { 133 renderStringContent(graphRenderer.renderDot()); 134 } 135 136 private void renderAndWriteSvg() throws SAXException { 137 renderStringContent(graphRenderer.renderSvg()); 138 } 139 140 private void renderStringContent(final String content) throws SAXException { 141 Preconditions.checkState(!Strings.isNullOrEmpty(content), "No output generated from graph."); 142 143 try { 144 out.write(content.getBytes(Charsets.UTF_8)); 145 } catch (final IOException ioe) { 146 throw new SAXException(ioe); 147 } 148 } 149 150 private Map<String, WorkflowAction> fillWorkflowActions() { 151 final Map<String, WorkflowAction> workflowActions = new LinkedHashMap<>(); 152 153 boolean found = false; 154 for (final WorkflowAction wfAction : job.getActions()) { 155 workflowActions.put(wfAction.getName(), wfAction); 156 if (!found) { 157 switch (wfAction.getStatus()) { 158 case KILLED: 159 case ERROR: 160 case FAILED: 161 showKill = true; // Assuming on error the workflow eventually ends with kill node 162 found = true; 163 break; 164 default: 165 // Look further 166 break; 167 } 168 } 169 } 170 171 return workflowActions; 172 } 173 174 175 @Override 176 public void startElement(final String namespaceURI, 177 final String localName, 178 final String qName, 179 final Attributes atts) 180 throws SAXException { 181 if (localName.equalsIgnoreCase("start")) { 182 final String start = localName.toLowerCase(Locale.getDefault()); 183 if (!tags.containsKey(start)) { 184 final WorkflowActionNode v = new WorkflowActionNode(start, start); 185 v.addArc(atts.getValue("to")); 186 tags.put(start, v); 187 } 188 } else if (localName.equalsIgnoreCase("action")) { 189 state.action = atts.getValue("name"); 190 } else if (state.action != null && state.actionType == null) { 191 state.actionType = localName.toLowerCase(Locale.getDefault()); 192 } else if (localName.equalsIgnoreCase("ok") && state.action != null && state.actionOK == null) { 193 state.actionOK = atts.getValue("to"); 194 } else if (localName.equalsIgnoreCase("error") && state.action != null && state.actionErr == null) { 195 state.actionErr = atts.getValue("to"); 196 } else if (localName.equalsIgnoreCase("fork")) { 197 state.fork = atts.getValue("name"); 198 if (!tags.containsKey(state.fork)) { 199 tags.put(state.fork, new WorkflowActionNode(state.fork, localName.toLowerCase(Locale.getDefault()))); 200 } 201 } else if (localName.equalsIgnoreCase("path")) { 202 tags.get(state.fork).addArc(atts.getValue("start")); 203 } else if (localName.equalsIgnoreCase("join")) { 204 final String join = atts.getValue("name"); 205 if (!tags.containsKey(join)) { 206 final WorkflowActionNode v = new WorkflowActionNode(join, localName.toLowerCase(Locale.getDefault())); 207 v.addArc(atts.getValue("to")); 208 tags.put(join, v); 209 } 210 } else if (localName.equalsIgnoreCase("decision")) { 211 state.decision = atts.getValue("name"); 212 if (!tags.containsKey(state.decision)) { 213 tags.put(state.decision, new WorkflowActionNode(state.decision, localName.toLowerCase(Locale.getDefault()))); 214 } 215 } else if (localName.equalsIgnoreCase("case") 216 || localName.equalsIgnoreCase("default")) { 217 tags.get(state.decision).addArc(atts.getValue("to")); 218 } else if (localName.equalsIgnoreCase("kill") 219 || localName.equalsIgnoreCase("end")) { 220 final String name = atts.getValue("name"); 221 if (!tags.containsKey(name)) { 222 tags.put(name, new WorkflowActionNode(name, localName.toLowerCase(Locale.getDefault()))); 223 } 224 } 225 } 226 227 228 @Override 229 public void endElement(final String namespaceURI, 230 final String localName, 231 final String qName) 232 throws SAXException { 233 if (localName.equalsIgnoreCase("action")) { 234 tags.put(state.action, new WorkflowActionNode(state.action, state.actionType)); 235 tags.get(state.action).addArc(state.actionOK); 236 tags.get(state.action).addArc(state.actionErr, true); 237 238 state.reset(); 239 } 240 } 241 242 private static class WorkflowParseState { 243 private String action; 244 private String actionOK; 245 private String actionErr; 246 private String actionType; 247 private String fork; 248 private String decision; 249 250 public void reset() { 251 action = null; 252 actionOK = null; 253 actionErr = null; 254 actionType = null; 255 } 256 } 257}