001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.workflow.lite; 020 021import org.apache.hadoop.io.Writable; 022import org.apache.hadoop.util.ReflectionUtils; 023import org.apache.oozie.util.StringSerializationUtil; 024import org.apache.oozie.workflow.WorkflowApp; 025import org.apache.oozie.workflow.WorkflowException; 026import org.apache.oozie.util.ParamChecker; 027import org.apache.oozie.ErrorCode; 028 029import java.io.DataInput; 030import java.io.DataOutput; 031import java.io.IOException; 032import java.util.Collection; 033import java.util.Collections; 034import java.util.LinkedHashMap; 035import java.util.Map; 036 037//TODO javadoc 038public class LiteWorkflowApp implements Writable, WorkflowApp { 039 /** 040 * Serialization of strings longer than 65k changed. This flag marks which method to use during reading. 041 */ 042 public static final int NEW_SERIALIZATION_METHOD_FLAG = -1; 043 private String name; 044 private String definition; 045 private Map<String, NodeDef> nodesMap = new LinkedHashMap<String, NodeDef>(); 046 private boolean complete = false; 047 048 LiteWorkflowApp() { 049 } 050 051 public LiteWorkflowApp(String name, String definition, StartNodeDef startNode) { 052 this.name = ParamChecker.notEmpty(name, "name"); 053 this.definition = ParamChecker.notEmpty(definition, "definition"); 054 nodesMap.put(StartNodeDef.START, startNode); 055 } 056 057 public boolean equals(LiteWorkflowApp other) { 058 return !(other == null || getClass() != other.getClass() || !getName().equals(other.getName())); 059 } 060 061 public int hashCode() { 062 return name.hashCode(); 063 } 064 065 public LiteWorkflowApp addNode(NodeDef node) throws WorkflowException { 066 ParamChecker.notNull(node, "node"); 067 if (complete) { 068 throw new WorkflowException(ErrorCode.E0704, name); 069 } 070 if (nodesMap.containsKey(node.getName())) { 071 throw new WorkflowException(ErrorCode.E0705, node.getName()); 072 } 073 if (node.getTransitions().contains(node.getName())) { 074 throw new WorkflowException(ErrorCode.E0706, node.getName()); 075 } 076 nodesMap.put(node.getName(), node); 077 if (node instanceof EndNodeDef) { 078 complete = true; 079 } 080 return this; 081 } 082 083 public String getName() { 084 return name; 085 } 086 087 public String getDefinition() { 088 return definition; 089 } 090 091 public Collection<NodeDef> getNodeDefs() { 092 return Collections.unmodifiableCollection(nodesMap.values()); 093 } 094 095 public NodeDef getNode(String name) { 096 return nodesMap.get(name); 097 } 098 099 public void validateWorkflowIntegrity() { 100 //TODO traverse wf, ensure there are not cycles, no open paths, and one END 101 } 102 103 public void validateTransition(String name, String transition) { 104 ParamChecker.notEmpty(name, "name"); 105 ParamChecker.notEmpty(transition, "transition"); 106 NodeDef node = getNode(name); 107 if (!node.getTransitions().contains(transition)) { 108 throw new IllegalArgumentException("invalid transition"); 109 } 110 } 111 112 113 @Override 114 public void write(DataOutput dataOutput) throws IOException { 115 dataOutput.writeUTF(name); 116 // write out -1 as a marker to use StringSerializationUtil. Previously it was split to 20k long bits in a list. 117 dataOutput.writeInt(NEW_SERIALIZATION_METHOD_FLAG); 118 StringSerializationUtil.writeString(dataOutput, definition); 119 dataOutput.writeInt(nodesMap.size()); 120 for (NodeDef n : getNodeDefs()) { 121 dataOutput.writeUTF(n.getClass().getName()); 122 n.write(dataOutput); 123 } 124 } 125 126 @Override 127 public void readFields(DataInput dataInput) throws IOException { 128 name = dataInput.readUTF(); 129 //read the full definition back 130 int definitionListFlag = dataInput.readInt(); 131 if(definitionListFlag > NEW_SERIALIZATION_METHOD_FLAG) { 132 // negative number marking the usage of StringSerializationUtil 133 // positive number is the length of the array the String was broken into. 134 StringBuilder sb = new StringBuilder(); 135 for (int i = 0; i < definitionListFlag; i++) { 136 sb.append(dataInput.readUTF()); 137 } 138 definition = sb.toString(); 139 } else { 140 definition = StringSerializationUtil.readString(dataInput); 141 } 142 int numNodes = dataInput.readInt(); 143 for (int x = 0; x < numNodes; x++) { 144 try { 145 String nodeDefClass = dataInput.readUTF(); 146 NodeDef node = (NodeDef) ReflectionUtils.newInstance(Class.forName(nodeDefClass), null); 147 node.readFields(dataInput); 148 addNode(node); 149 } 150 catch (WorkflowException ex) { 151 throw new IOException(ex); 152 } 153 catch (ClassNotFoundException e) { 154 throw new IOException(e); 155 } 156 } 157 } 158 159}