001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.workflow.lite; 019 020import org.apache.hadoop.io.Writable; 021import org.apache.hadoop.util.ReflectionUtils; 022import org.apache.oozie.workflow.WorkflowApp; 023import org.apache.oozie.workflow.WorkflowException; 024import org.apache.oozie.util.ParamChecker; 025import org.apache.oozie.util.XLog; 026import org.apache.oozie.ErrorCode; 027 028import java.io.DataInput; 029import java.io.DataOutput; 030import java.io.IOException; 031import java.util.ArrayList; 032import java.util.Collection; 033import java.util.Collections; 034import java.util.LinkedHashMap; 035import java.util.List; 036import java.util.Map; 037 038//TODO javadoc 039public class LiteWorkflowApp implements Writable, WorkflowApp { 040 private String name; 041 private String definition; 042 private Map<String, NodeDef> nodesMap = new LinkedHashMap<String, NodeDef>(); 043 private boolean complete = false; 044 045 LiteWorkflowApp() { 046 } 047 048 public LiteWorkflowApp(String name, String definition, StartNodeDef startNode) { 049 this.name = ParamChecker.notEmpty(name, "name"); 050 this.definition = ParamChecker.notEmpty(definition, "definition"); 051 nodesMap.put(StartNodeDef.START, startNode); 052 } 053 054 public boolean equals(LiteWorkflowApp other) { 055 return !(other == null || getClass() != other.getClass() || !getName().equals(other.getName())); 056 } 057 058 public int hashCode() { 059 return name.hashCode(); 060 } 061 062 public LiteWorkflowApp addNode(NodeDef node) throws WorkflowException { 063 ParamChecker.notNull(node, "node"); 064 if (complete) { 065 throw new WorkflowException(ErrorCode.E0704, name); 066 } 067 if (nodesMap.containsKey(node.getName())) { 068 throw new WorkflowException(ErrorCode.E0705, node.getName()); 069 } 070 if (node.getTransitions().contains(node.getName())) { 071 throw new WorkflowException(ErrorCode.E0706, node.getName()); 072 } 073 nodesMap.put(node.getName(), node); 074 if (node instanceof EndNodeDef) { 075 complete = true; 076 } 077 return this; 078 } 079 080 public String getName() { 081 return name; 082 } 083 084 public String getDefinition() { 085 return definition; 086 } 087 088 public Collection<NodeDef> getNodeDefs() { 089 return Collections.unmodifiableCollection(nodesMap.values()); 090 } 091 092 public NodeDef getNode(String name) { 093 return nodesMap.get(name); 094 } 095 096 public void validateWorkflowIntegrity() { 097 //TODO traverse wf, ensure there are not cycles, no open paths, and one END 098 } 099 100 public void validateTransition(String name, String transition) { 101 ParamChecker.notEmpty(name, "name"); 102 ParamChecker.notEmpty(transition, "transition"); 103 NodeDef node = getNode(name); 104 if (!node.getTransitions().contains(transition)) { 105 throw new IllegalArgumentException("invalid transition"); 106 } 107 } 108 109 110 @Override 111 public void write(DataOutput dataOutput) throws IOException { 112 dataOutput.writeUTF(name); 113 //dataOutput.writeUTF(definition); 114 //writeUTF() has limit 65535, so split long string to multiple short strings 115 List<String> defList = divideStr(definition); 116 dataOutput.writeInt(defList.size()); 117 for (String d : defList) { 118 dataOutput.writeUTF(d); 119 } 120 dataOutput.writeInt(nodesMap.size()); 121 for (NodeDef n : getNodeDefs()) { 122 dataOutput.writeUTF(n.getClass().getName()); 123 n.write(dataOutput); 124 } 125 } 126 127 /** 128 * To split long string to a list of smaller strings. 129 * 130 * @param str 131 * @return List 132 */ 133 private List<String> divideStr(String str) { 134 List<String> list = new ArrayList<String>(); 135 int len = 20000; 136 int strlen = str.length(); 137 int start = 0; 138 int end = len; 139 140 while (end < strlen) { 141 list.add(str.substring(start, end)); 142 start = end; 143 end += len; 144 } 145 146 if (strlen <= end) { 147 list.add(str.substring(start, strlen)); 148 } 149 return list; 150 } 151 152 @Override 153 public void readFields(DataInput dataInput) throws IOException { 154 name = dataInput.readUTF(); 155 //definition = dataInput.readUTF(); 156 //read the full definition back 157 int defListSize = dataInput.readInt(); 158 StringBuilder sb = new StringBuilder(); 159 for (int i = 0; i < defListSize; i++) { 160 sb.append(dataInput.readUTF()); 161 } 162 definition = sb.toString(); 163 164 int numNodes = dataInput.readInt(); 165 for (int x = 0; x < numNodes; x++) { 166 try { 167 String nodeDefClass = dataInput.readUTF(); 168 NodeDef node = (NodeDef) ReflectionUtils.newInstance(Class.forName(nodeDefClass), null); 169 node.readFields(dataInput); 170 addNode(node); 171 } 172 catch (WorkflowException ex) { 173 throw new IOException(ex); 174 } 175 catch (ClassNotFoundException e) { 176 throw new IOException(e); 177 } 178 } 179 } 180 181}