001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.workflow.lite; 019 020 import org.apache.hadoop.io.Writable; 021 import org.apache.hadoop.util.ReflectionUtils; 022 import org.apache.oozie.workflow.WorkflowApp; 023 import org.apache.oozie.workflow.WorkflowException; 024 import org.apache.oozie.util.ParamChecker; 025 import org.apache.oozie.util.XLog; 026 import org.apache.oozie.ErrorCode; 027 028 import java.io.DataInput; 029 import java.io.DataOutput; 030 import java.io.IOException; 031 import java.util.ArrayList; 032 import java.util.Collection; 033 import java.util.Collections; 034 import java.util.LinkedHashMap; 035 import java.util.List; 036 import java.util.Map; 037 038 //TODO javadoc 039 public class LiteWorkflowApp implements Writable, WorkflowApp { 040 private String name; 041 private String definition; 042 private Map<String, NodeDef> nodesMap = new LinkedHashMap<String, NodeDef>(); 043 private boolean complete = false; 044 045 LiteWorkflowApp() { 046 } 047 048 public LiteWorkflowApp(String name, String definition, StartNodeDef startNode) { 049 this.name = ParamChecker.notEmpty(name, "name"); 050 this.definition = ParamChecker.notEmpty(definition, "definition"); 051 nodesMap.put(StartNodeDef.START, startNode); 052 } 053 054 public boolean equals(LiteWorkflowApp other) { 055 return !(other == null || getClass() != other.getClass() || !getName().equals(other.getName())); 056 } 057 058 public int hashCode() { 059 return name.hashCode(); 060 } 061 062 public LiteWorkflowApp addNode(NodeDef node) throws WorkflowException { 063 ParamChecker.notNull(node, "node"); 064 if (complete) { 065 throw new WorkflowException(ErrorCode.E0704, 066 XLog.format("Definition [{0}] already complete", name)); 067 } 068 if (nodesMap.containsKey(node.getName())) { 069 throw new WorkflowException(ErrorCode.E0705, 070 XLog.format("Node [{0}] already defined", node.getName())); 071 } 072 if (node.getTransitions().contains(node.getName())) { 073 throw new WorkflowException(ErrorCode.E0706, 074 XLog.format("Node [{0}] cannot transition to itself", node.getName())); 075 } 076 nodesMap.put(node.getName(), node); 077 if (node instanceof EndNodeDef) { 078 complete = true; 079 } 080 return this; 081 } 082 083 public String getName() { 084 return name; 085 } 086 087 public String getDefinition() { 088 return definition; 089 } 090 091 public Collection<NodeDef> getNodeDefs() { 092 return Collections.unmodifiableCollection(nodesMap.values()); 093 } 094 095 public NodeDef getNode(String name) { 096 return nodesMap.get(name); 097 } 098 099 public void validateWorkflowIntegrity() { 100 //TODO traverse wf, ensure there are not cycles, no open paths, and one END 101 } 102 103 public void validateTransition(String name, String transition) { 104 ParamChecker.notEmpty(name, "name"); 105 ParamChecker.notEmpty(transition, "transition"); 106 NodeDef node = getNode(name); 107 if (!node.getTransitions().contains(transition)) { 108 throw new IllegalArgumentException("invalid transition"); 109 } 110 } 111 112 113 @Override 114 public void write(DataOutput dataOutput) throws IOException { 115 dataOutput.writeUTF(name); 116 //dataOutput.writeUTF(definition); 117 //writeUTF() has limit 65535, so split long string to multiple short strings 118 List<String> defList = divideStr(definition); 119 dataOutput.writeInt(defList.size()); 120 for (String d : defList) { 121 dataOutput.writeUTF(d); 122 } 123 dataOutput.writeInt(nodesMap.size()); 124 for (NodeDef n : getNodeDefs()) { 125 dataOutput.writeUTF(n.getClass().getName()); 126 n.write(dataOutput); 127 } 128 } 129 130 /** 131 * To split long string to a list of smaller strings. 132 * 133 * @param str 134 * @return List 135 */ 136 private List<String> divideStr(String str) { 137 List<String> list = new ArrayList<String>(); 138 int len = 20000; 139 int strlen = str.length(); 140 int start = 0; 141 int end = len; 142 143 while (end < strlen) { 144 list.add(str.substring(start, end)); 145 start = end; 146 end += len; 147 } 148 149 if (strlen <= end) { 150 list.add(str.substring(start, strlen)); 151 } 152 return list; 153 } 154 155 @Override 156 public void readFields(DataInput dataInput) throws IOException { 157 name = dataInput.readUTF(); 158 //definition = dataInput.readUTF(); 159 //read the full definition back 160 int defListSize = dataInput.readInt(); 161 StringBuilder sb = new StringBuilder(); 162 for (int i = 0; i < defListSize; i++) { 163 sb.append(dataInput.readUTF()); 164 } 165 definition = sb.toString(); 166 167 int numNodes = dataInput.readInt(); 168 for (int x = 0; x < numNodes; x++) { 169 try { 170 String nodeDefClass = dataInput.readUTF(); 171 NodeDef node = (NodeDef) ReflectionUtils.newInstance(Class.forName(nodeDefClass), null); 172 node.readFields(dataInput); 173 addNode(node); 174 } 175 catch (WorkflowException ex) { 176 throw new IOException(ex); 177 } 178 catch (ClassNotFoundException e) { 179 throw new IOException(e); 180 } 181 } 182 } 183 184 }