001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.workflow.lite;
019
020import org.apache.hadoop.io.Writable;
021import org.apache.hadoop.util.ReflectionUtils;
022import org.apache.oozie.workflow.WorkflowApp;
023import org.apache.oozie.workflow.WorkflowException;
024import org.apache.oozie.util.ParamChecker;
025import org.apache.oozie.util.XLog;
026import org.apache.oozie.ErrorCode;
027
028import java.io.DataInput;
029import java.io.DataOutput;
030import java.io.IOException;
031import java.util.ArrayList;
032import java.util.Collection;
033import java.util.Collections;
034import java.util.LinkedHashMap;
035import java.util.List;
036import java.util.Map;
037
038//TODO javadoc
039public class LiteWorkflowApp implements Writable, WorkflowApp {
040    private String name;
041    private String definition;
042    private Map<String, NodeDef> nodesMap = new LinkedHashMap<String, NodeDef>();
043    private boolean complete = false;
044
045    LiteWorkflowApp() {
046    }
047
048    public LiteWorkflowApp(String name, String definition, StartNodeDef startNode) {
049        this.name = ParamChecker.notEmpty(name, "name");
050        this.definition = ParamChecker.notEmpty(definition, "definition");
051        nodesMap.put(StartNodeDef.START, startNode);
052    }
053
054    public boolean equals(LiteWorkflowApp other) {
055        return !(other == null || getClass() != other.getClass() || !getName().equals(other.getName()));
056    }
057
058    public int hashCode() {
059        return name.hashCode();
060    }
061
062    public LiteWorkflowApp addNode(NodeDef node) throws WorkflowException {
063        ParamChecker.notNull(node, "node");
064        if (complete) {
065            throw new WorkflowException(ErrorCode.E0704, name);
066        }
067        if (nodesMap.containsKey(node.getName())) {
068            throw new WorkflowException(ErrorCode.E0705, node.getName());
069        }
070        if (node.getTransitions().contains(node.getName())) {
071            throw new WorkflowException(ErrorCode.E0706, node.getName());
072        }
073        nodesMap.put(node.getName(), node);
074        if (node instanceof EndNodeDef) {
075            complete = true;
076        }
077        return this;
078    }
079
080    public String getName() {
081        return name;
082    }
083
084    public String getDefinition() {
085        return definition;
086    }
087
088    public Collection<NodeDef> getNodeDefs() {
089        return Collections.unmodifiableCollection(nodesMap.values());
090    }
091
092    public NodeDef getNode(String name) {
093        return nodesMap.get(name);
094    }
095
096    public void validateWorkflowIntegrity() {
097        //TODO traverse wf, ensure there are not cycles, no open paths, and one END
098    }
099
100    public void validateTransition(String name, String transition) {
101        ParamChecker.notEmpty(name, "name");
102        ParamChecker.notEmpty(transition, "transition");
103        NodeDef node = getNode(name);
104        if (!node.getTransitions().contains(transition)) {
105            throw new IllegalArgumentException("invalid transition");
106        }
107    }
108
109
110    @Override
111    public void write(DataOutput dataOutput) throws IOException {
112        dataOutput.writeUTF(name);
113        //dataOutput.writeUTF(definition);
114        //writeUTF() has limit 65535, so split long string to multiple short strings
115        List<String> defList = divideStr(definition);
116        dataOutput.writeInt(defList.size());
117        for (String d : defList) {
118            dataOutput.writeUTF(d);
119        }
120        dataOutput.writeInt(nodesMap.size());
121        for (NodeDef n : getNodeDefs()) {
122            dataOutput.writeUTF(n.getClass().getName());
123            n.write(dataOutput);
124        }
125    }
126
127    /**
128     * To split long string to a list of smaller strings.
129     *
130     * @param str
131     * @return List
132     */
133    private List<String> divideStr(String str) {
134        List<String> list = new ArrayList<String>();
135        int len = 20000;
136        int strlen = str.length();
137        int start = 0;
138        int end = len;
139
140        while (end < strlen) {
141            list.add(str.substring(start, end));
142            start = end;
143            end += len;
144        }
145
146        if (strlen <= end) {
147            list.add(str.substring(start, strlen));
148        }
149        return list;
150    }
151
152    @Override
153    public void readFields(DataInput dataInput) throws IOException {
154        name = dataInput.readUTF();
155        //definition = dataInput.readUTF();
156        //read the full definition back
157        int defListSize = dataInput.readInt();
158        StringBuilder sb = new StringBuilder();
159        for (int i = 0; i < defListSize; i++) {
160            sb.append(dataInput.readUTF());
161        }
162        definition = sb.toString();
163
164        int numNodes = dataInput.readInt();
165        for (int x = 0; x < numNodes; x++) {
166            try {
167                String nodeDefClass = dataInput.readUTF();
168                NodeDef node = (NodeDef) ReflectionUtils.newInstance(Class.forName(nodeDefClass), null);
169                node.readFields(dataInput);
170                addNode(node);
171            }
172            catch (WorkflowException ex) {
173                throw new IOException(ex);
174            }
175            catch (ClassNotFoundException e) {
176                throw new IOException(e);
177            }
178        }
179    }
180
181}