001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.workflow.lite; 020 021import org.apache.hadoop.io.Writable; 022import org.apache.hadoop.util.ReflectionUtils; 023import org.apache.oozie.workflow.WorkflowApp; 024import org.apache.oozie.workflow.WorkflowException; 025import org.apache.oozie.util.ParamChecker; 026import org.apache.oozie.util.XLog; 027import org.apache.oozie.ErrorCode; 028 029import java.io.DataInput; 030import java.io.DataOutput; 031import java.io.IOException; 032import java.util.ArrayList; 033import java.util.Collection; 034import java.util.Collections; 035import java.util.LinkedHashMap; 036import java.util.List; 037import java.util.Map; 038 039//TODO javadoc 040public class LiteWorkflowApp implements Writable, WorkflowApp { 041 private String name; 042 private String definition; 043 private Map<String, NodeDef> nodesMap = new LinkedHashMap<String, NodeDef>(); 044 private boolean complete = false; 045 046 LiteWorkflowApp() { 047 } 048 049 public LiteWorkflowApp(String name, String definition, StartNodeDef startNode) { 050 this.name = ParamChecker.notEmpty(name, "name"); 051 this.definition = ParamChecker.notEmpty(definition, "definition"); 052 nodesMap.put(StartNodeDef.START, startNode); 053 } 054 055 public boolean equals(LiteWorkflowApp other) { 056 return !(other == null || getClass() != other.getClass() || !getName().equals(other.getName())); 057 } 058 059 public int hashCode() { 060 return name.hashCode(); 061 } 062 063 public LiteWorkflowApp addNode(NodeDef node) throws WorkflowException { 064 ParamChecker.notNull(node, "node"); 065 if (complete) { 066 throw new WorkflowException(ErrorCode.E0704, name); 067 } 068 if (nodesMap.containsKey(node.getName())) { 069 throw new WorkflowException(ErrorCode.E0705, node.getName()); 070 } 071 if (node.getTransitions().contains(node.getName())) { 072 throw new WorkflowException(ErrorCode.E0706, node.getName()); 073 } 074 nodesMap.put(node.getName(), node); 075 if (node instanceof EndNodeDef) { 076 complete = true; 077 } 078 return this; 079 } 080 081 public String getName() { 082 return name; 083 } 084 085 public String getDefinition() { 086 return definition; 087 } 088 089 public Collection<NodeDef> getNodeDefs() { 090 return Collections.unmodifiableCollection(nodesMap.values()); 091 } 092 093 public NodeDef getNode(String name) { 094 return nodesMap.get(name); 095 } 096 097 public void validateWorkflowIntegrity() { 098 //TODO traverse wf, ensure there are not cycles, no open paths, and one END 099 } 100 101 public void validateTransition(String name, String transition) { 102 ParamChecker.notEmpty(name, "name"); 103 ParamChecker.notEmpty(transition, "transition"); 104 NodeDef node = getNode(name); 105 if (!node.getTransitions().contains(transition)) { 106 throw new IllegalArgumentException("invalid transition"); 107 } 108 } 109 110 111 @Override 112 public void write(DataOutput dataOutput) throws IOException { 113 dataOutput.writeUTF(name); 114 //dataOutput.writeUTF(definition); 115 //writeUTF() has limit 65535, so split long string to multiple short strings 116 List<String> defList = divideStr(definition); 117 dataOutput.writeInt(defList.size()); 118 for (String d : defList) { 119 dataOutput.writeUTF(d); 120 } 121 dataOutput.writeInt(nodesMap.size()); 122 for (NodeDef n : getNodeDefs()) { 123 dataOutput.writeUTF(n.getClass().getName()); 124 n.write(dataOutput); 125 } 126 } 127 128 /** 129 * To split long string to a list of smaller strings. 130 * 131 * @param str 132 * @return List 133 */ 134 private List<String> divideStr(String str) { 135 List<String> list = new ArrayList<String>(); 136 int len = 20000; 137 int strlen = str.length(); 138 int start = 0; 139 int end = len; 140 141 while (end < strlen) { 142 list.add(str.substring(start, end)); 143 start = end; 144 end += len; 145 } 146 147 if (strlen <= end) { 148 list.add(str.substring(start, strlen)); 149 } 150 return list; 151 } 152 153 @Override 154 public void readFields(DataInput dataInput) throws IOException { 155 name = dataInput.readUTF(); 156 //definition = dataInput.readUTF(); 157 //read the full definition back 158 int defListSize = dataInput.readInt(); 159 StringBuilder sb = new StringBuilder(); 160 for (int i = 0; i < defListSize; i++) { 161 sb.append(dataInput.readUTF()); 162 } 163 definition = sb.toString(); 164 165 int numNodes = dataInput.readInt(); 166 for (int x = 0; x < numNodes; x++) { 167 try { 168 String nodeDefClass = dataInput.readUTF(); 169 NodeDef node = (NodeDef) ReflectionUtils.newInstance(Class.forName(nodeDefClass), null); 170 node.readFields(dataInput); 171 addNode(node); 172 } 173 catch (WorkflowException ex) { 174 throw new IOException(ex); 175 } 176 catch (ClassNotFoundException e) { 177 throw new IOException(e); 178 } 179 } 180 } 181 182}