This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.workflow.lite;
019
020 import org.apache.hadoop.io.Writable;
021 import org.apache.hadoop.util.ReflectionUtils;
022 import org.apache.oozie.workflow.WorkflowApp;
023 import org.apache.oozie.workflow.WorkflowException;
024 import org.apache.oozie.util.ParamChecker;
025 import org.apache.oozie.util.XLog;
026 import org.apache.oozie.ErrorCode;
027
028 import java.io.DataInput;
029 import java.io.DataOutput;
030 import java.io.IOException;
031 import java.util.ArrayList;
032 import java.util.Collection;
033 import java.util.Collections;
034 import java.util.LinkedHashMap;
035 import java.util.List;
036 import java.util.Map;
037
038 //TODO javadoc
039 public class LiteWorkflowApp implements Writable, WorkflowApp {
040 private String name;
041 private String definition;
042 private Map<String, NodeDef> nodesMap = new LinkedHashMap<String, NodeDef>();
043 private boolean complete = false;
044
045 LiteWorkflowApp() {
046 }
047
048 public LiteWorkflowApp(String name, String definition, StartNodeDef startNode) {
049 this.name = ParamChecker.notEmpty(name, "name");
050 this.definition = ParamChecker.notEmpty(definition, "definition");
051 nodesMap.put(StartNodeDef.START, startNode);
052 }
053
054 public boolean equals(LiteWorkflowApp other) {
055 return !(other == null || getClass() != other.getClass() || !getName().equals(other.getName()));
056 }
057
058 public int hashCode() {
059 return name.hashCode();
060 }
061
062 public LiteWorkflowApp addNode(NodeDef node) throws WorkflowException {
063 ParamChecker.notNull(node, "node");
064 if (complete) {
065 throw new WorkflowException(ErrorCode.E0704,
066 XLog.format("Definition [{0}] already complete", name));
067 }
068 if (nodesMap.containsKey(node.getName())) {
069 throw new WorkflowException(ErrorCode.E0705,
070 XLog.format("Node [{0}] already defined", node.getName()));
071 }
072 if (node.getTransitions().contains(node.getName())) {
073 throw new WorkflowException(ErrorCode.E0706,
074 XLog.format("Node [{0}] cannot transition to itself", node.getName()));
075 }
076 nodesMap.put(node.getName(), node);
077 if (node instanceof EndNodeDef) {
078 complete = true;
079 }
080 return this;
081 }
082
083 public String getName() {
084 return name;
085 }
086
087 public String getDefinition() {
088 return definition;
089 }
090
091 public Collection<NodeDef> getNodeDefs() {
092 return Collections.unmodifiableCollection(nodesMap.values());
093 }
094
095 public NodeDef getNode(String name) {
096 return nodesMap.get(name);
097 }
098
099 public void validateWorkflowIntegrity() {
100 //TODO traverse wf, ensure there are not cycles, no open paths, and one END
101 }
102
103 public void validateTransition(String name, String transition) {
104 ParamChecker.notEmpty(name, "name");
105 ParamChecker.notEmpty(transition, "transition");
106 NodeDef node = getNode(name);
107 if (!node.getTransitions().contains(transition)) {
108 throw new IllegalArgumentException("invalid transition");
109 }
110 }
111
112
113 @Override
114 public void write(DataOutput dataOutput) throws IOException {
115 dataOutput.writeUTF(name);
116 //dataOutput.writeUTF(definition);
117 //writeUTF() has limit 65535, so split long string to multiple short strings
118 List<String> defList = divideStr(definition);
119 dataOutput.writeInt(defList.size());
120 for (String d : defList) {
121 dataOutput.writeUTF(d);
122 }
123 dataOutput.writeInt(nodesMap.size());
124 for (NodeDef n : getNodeDefs()) {
125 dataOutput.writeUTF(n.getClass().getName());
126 n.write(dataOutput);
127 }
128 }
129
130 /**
131 * To split long string to a list of smaller strings.
132 *
133 * @param str
134 * @return List
135 */
136 private List<String> divideStr(String str) {
137 List<String> list = new ArrayList<String>();
138 int len = 20000;
139 int strlen = str.length();
140 int start = 0;
141 int end = len;
142
143 while (end < strlen) {
144 list.add(str.substring(start, end));
145 start = end;
146 end += len;
147 }
148
149 if (strlen <= end) {
150 list.add(str.substring(start, strlen));
151 }
152 return list;
153 }
154
155 @Override
156 public void readFields(DataInput dataInput) throws IOException {
157 name = dataInput.readUTF();
158 //definition = dataInput.readUTF();
159 //read the full definition back
160 int defListSize = dataInput.readInt();
161 StringBuilder sb = new StringBuilder();
162 for (int i = 0; i < defListSize; i++) {
163 sb.append(dataInput.readUTF());
164 }
165 definition = sb.toString();
166
167 int numNodes = dataInput.readInt();
168 for (int x = 0; x < numNodes; x++) {
169 try {
170 String nodeDefClass = dataInput.readUTF();
171 NodeDef node = (NodeDef) ReflectionUtils.newInstance(Class.forName(nodeDefClass), null);
172 node.readFields(dataInput);
173 addNode(node);
174 }
175 catch (WorkflowException ex) {
176 throw new IOException(ex);
177 }
178 catch (ClassNotFoundException e) {
179 throw new IOException(e);
180 }
181 }
182 }
183
184 }