This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.workflow.lite;
019
020 import org.apache.hadoop.io.Writable;
021 import org.apache.hadoop.util.ReflectionUtils;
022 import org.apache.oozie.workflow.WorkflowApp;
023 import org.apache.oozie.workflow.WorkflowException;
024 import org.apache.oozie.util.ParamChecker;
025 import org.apache.oozie.util.XLog;
026 import org.apache.oozie.ErrorCode;
027
028 import java.io.DataInput;
029 import java.io.DataOutput;
030 import java.io.IOException;
031 import java.util.ArrayList;
032 import java.util.Collection;
033 import java.util.Collections;
034 import java.util.LinkedHashMap;
035 import java.util.List;
036 import java.util.Map;
037
038 //TODO javadoc
039 public class LiteWorkflowApp implements Writable, WorkflowApp {
040 private String name;
041 private String definition;
042 private Map<String, NodeDef> nodesMap = new LinkedHashMap<String, NodeDef>();
043 private boolean complete = false;
044
045 LiteWorkflowApp() {
046 }
047
048 public LiteWorkflowApp(String name, String definition, StartNodeDef startNode) {
049 this.name = ParamChecker.notEmpty(name, "name");
050 this.definition = ParamChecker.notEmpty(definition, "definition");
051 nodesMap.put(StartNodeDef.START, startNode);
052 }
053
054 public boolean equals(LiteWorkflowApp other) {
055 return !(other == null || getClass() != other.getClass() || !getName().equals(other.getName()));
056 }
057
058 public int hashCode() {
059 return name.hashCode();
060 }
061
062 public LiteWorkflowApp addNode(NodeDef node) throws WorkflowException {
063 ParamChecker.notNull(node, "node");
064 if (complete) {
065 throw new WorkflowException(ErrorCode.E0704, name);
066 }
067 if (nodesMap.containsKey(node.getName())) {
068 throw new WorkflowException(ErrorCode.E0705, node.getName());
069 }
070 if (node.getTransitions().contains(node.getName())) {
071 throw new WorkflowException(ErrorCode.E0706, node.getName());
072 }
073 nodesMap.put(node.getName(), node);
074 if (node instanceof EndNodeDef) {
075 complete = true;
076 }
077 return this;
078 }
079
080 public String getName() {
081 return name;
082 }
083
084 public String getDefinition() {
085 return definition;
086 }
087
088 public Collection<NodeDef> getNodeDefs() {
089 return Collections.unmodifiableCollection(nodesMap.values());
090 }
091
092 public NodeDef getNode(String name) {
093 return nodesMap.get(name);
094 }
095
096 public void validateWorkflowIntegrity() {
097 //TODO traverse wf, ensure there are not cycles, no open paths, and one END
098 }
099
100 public void validateTransition(String name, String transition) {
101 ParamChecker.notEmpty(name, "name");
102 ParamChecker.notEmpty(transition, "transition");
103 NodeDef node = getNode(name);
104 if (!node.getTransitions().contains(transition)) {
105 throw new IllegalArgumentException("invalid transition");
106 }
107 }
108
109
110 @Override
111 public void write(DataOutput dataOutput) throws IOException {
112 dataOutput.writeUTF(name);
113 //dataOutput.writeUTF(definition);
114 //writeUTF() has limit 65535, so split long string to multiple short strings
115 List<String> defList = divideStr(definition);
116 dataOutput.writeInt(defList.size());
117 for (String d : defList) {
118 dataOutput.writeUTF(d);
119 }
120 dataOutput.writeInt(nodesMap.size());
121 for (NodeDef n : getNodeDefs()) {
122 dataOutput.writeUTF(n.getClass().getName());
123 n.write(dataOutput);
124 }
125 }
126
127 /**
128 * To split long string to a list of smaller strings.
129 *
130 * @param str
131 * @return List
132 */
133 private List<String> divideStr(String str) {
134 List<String> list = new ArrayList<String>();
135 int len = 20000;
136 int strlen = str.length();
137 int start = 0;
138 int end = len;
139
140 while (end < strlen) {
141 list.add(str.substring(start, end));
142 start = end;
143 end += len;
144 }
145
146 if (strlen <= end) {
147 list.add(str.substring(start, strlen));
148 }
149 return list;
150 }
151
152 @Override
153 public void readFields(DataInput dataInput) throws IOException {
154 name = dataInput.readUTF();
155 //definition = dataInput.readUTF();
156 //read the full definition back
157 int defListSize = dataInput.readInt();
158 StringBuilder sb = new StringBuilder();
159 for (int i = 0; i < defListSize; i++) {
160 sb.append(dataInput.readUTF());
161 }
162 definition = sb.toString();
163
164 int numNodes = dataInput.readInt();
165 for (int x = 0; x < numNodes; x++) {
166 try {
167 String nodeDefClass = dataInput.readUTF();
168 NodeDef node = (NodeDef) ReflectionUtils.newInstance(Class.forName(nodeDefClass), null);
169 node.readFields(dataInput);
170 addNode(node);
171 }
172 catch (WorkflowException ex) {
173 throw new IOException(ex);
174 }
175 catch (ClassNotFoundException e) {
176 throw new IOException(e);
177 }
178 }
179 }
180
181 }