001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.workflow.lite;
020
021import org.apache.hadoop.io.Writable;
022import org.apache.hadoop.util.ReflectionUtils;
023import org.apache.oozie.workflow.WorkflowApp;
024import org.apache.oozie.workflow.WorkflowException;
025import org.apache.oozie.util.ParamChecker;
026import org.apache.oozie.util.XLog;
027import org.apache.oozie.ErrorCode;
028
029import java.io.DataInput;
030import java.io.DataOutput;
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.Collection;
034import java.util.Collections;
035import java.util.LinkedHashMap;
036import java.util.List;
037import java.util.Map;
038
039//TODO javadoc
040public class LiteWorkflowApp implements Writable, WorkflowApp {
041    private String name;
042    private String definition;
043    private Map<String, NodeDef> nodesMap = new LinkedHashMap<String, NodeDef>();
044    private boolean complete = false;
045
046    LiteWorkflowApp() {
047    }
048
049    public LiteWorkflowApp(String name, String definition, StartNodeDef startNode) {
050        this.name = ParamChecker.notEmpty(name, "name");
051        this.definition = ParamChecker.notEmpty(definition, "definition");
052        nodesMap.put(StartNodeDef.START, startNode);
053    }
054
055    public boolean equals(LiteWorkflowApp other) {
056        return !(other == null || getClass() != other.getClass() || !getName().equals(other.getName()));
057    }
058
059    public int hashCode() {
060        return name.hashCode();
061    }
062
063    public LiteWorkflowApp addNode(NodeDef node) throws WorkflowException {
064        ParamChecker.notNull(node, "node");
065        if (complete) {
066            throw new WorkflowException(ErrorCode.E0704, name);
067        }
068        if (nodesMap.containsKey(node.getName())) {
069            throw new WorkflowException(ErrorCode.E0705, node.getName());
070        }
071        if (node.getTransitions().contains(node.getName())) {
072            throw new WorkflowException(ErrorCode.E0706, node.getName());
073        }
074        nodesMap.put(node.getName(), node);
075        if (node instanceof EndNodeDef) {
076            complete = true;
077        }
078        return this;
079    }
080
081    public String getName() {
082        return name;
083    }
084
085    public String getDefinition() {
086        return definition;
087    }
088
089    public Collection<NodeDef> getNodeDefs() {
090        return Collections.unmodifiableCollection(nodesMap.values());
091    }
092
093    public NodeDef getNode(String name) {
094        return nodesMap.get(name);
095    }
096
097    public void validateWorkflowIntegrity() {
098        //TODO traverse wf, ensure there are not cycles, no open paths, and one END
099    }
100
101    public void validateTransition(String name, String transition) {
102        ParamChecker.notEmpty(name, "name");
103        ParamChecker.notEmpty(transition, "transition");
104        NodeDef node = getNode(name);
105        if (!node.getTransitions().contains(transition)) {
106            throw new IllegalArgumentException("invalid transition");
107        }
108    }
109
110
111    @Override
112    public void write(DataOutput dataOutput) throws IOException {
113        dataOutput.writeUTF(name);
114        //dataOutput.writeUTF(definition);
115        //writeUTF() has limit 65535, so split long string to multiple short strings
116        List<String> defList = divideStr(definition);
117        dataOutput.writeInt(defList.size());
118        for (String d : defList) {
119            dataOutput.writeUTF(d);
120        }
121        dataOutput.writeInt(nodesMap.size());
122        for (NodeDef n : getNodeDefs()) {
123            dataOutput.writeUTF(n.getClass().getName());
124            n.write(dataOutput);
125        }
126    }
127
128    /**
129     * To split long string to a list of smaller strings.
130     *
131     * @param str
132     * @return List
133     */
134    private List<String> divideStr(String str) {
135        List<String> list = new ArrayList<String>();
136        int len = 20000;
137        int strlen = str.length();
138        int start = 0;
139        int end = len;
140
141        while (end < strlen) {
142            list.add(str.substring(start, end));
143            start = end;
144            end += len;
145        }
146
147        if (strlen <= end) {
148            list.add(str.substring(start, strlen));
149        }
150        return list;
151    }
152
153    @Override
154    public void readFields(DataInput dataInput) throws IOException {
155        name = dataInput.readUTF();
156        //definition = dataInput.readUTF();
157        //read the full definition back
158        int defListSize = dataInput.readInt();
159        StringBuilder sb = new StringBuilder();
160        for (int i = 0; i < defListSize; i++) {
161            sb.append(dataInput.readUTF());
162        }
163        definition = sb.toString();
164
165        int numNodes = dataInput.readInt();
166        for (int x = 0; x < numNodes; x++) {
167            try {
168                String nodeDefClass = dataInput.readUTF();
169                NodeDef node = (NodeDef) ReflectionUtils.newInstance(Class.forName(nodeDefClass), null);
170                node.readFields(dataInput);
171                addNode(node);
172            }
173            catch (WorkflowException ex) {
174                throw new IOException(ex);
175            }
176            catch (ClassNotFoundException e) {
177                throw new IOException(e);
178            }
179        }
180    }
181
182}