001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.workflow.lite;
020
021import org.apache.hadoop.io.Writable;
022import org.apache.hadoop.util.ReflectionUtils;
023import org.apache.oozie.util.StringSerializationUtil;
024import org.apache.oozie.workflow.WorkflowApp;
025import org.apache.oozie.workflow.WorkflowException;
026import org.apache.oozie.util.ParamChecker;
027import org.apache.oozie.ErrorCode;
028
029import java.io.DataInput;
030import java.io.DataOutput;
031import java.io.IOException;
032import java.util.Collection;
033import java.util.Collections;
034import java.util.LinkedHashMap;
035import java.util.Map;
036
037//TODO javadoc
038public class LiteWorkflowApp implements Writable, WorkflowApp {
039    /**
040     * Serialization of strings longer than 65k changed. This flag marks which method to use during reading.
041     */
042    public static final int NEW_SERIALIZATION_METHOD_FLAG = -1;
043    private String name;
044    private String definition;
045    private Map<String, NodeDef> nodesMap = new LinkedHashMap<String, NodeDef>();
046    private boolean complete = false;
047
048    LiteWorkflowApp() {
049    }
050
051    public LiteWorkflowApp(String name, String definition, StartNodeDef startNode) {
052        this.name = ParamChecker.notEmpty(name, "name");
053        this.definition = ParamChecker.notEmpty(definition, "definition");
054        nodesMap.put(StartNodeDef.START, startNode);
055    }
056
057    public boolean equals(LiteWorkflowApp other) {
058        return !(other == null || getClass() != other.getClass() || !getName().equals(other.getName()));
059    }
060
061    public int hashCode() {
062        return name.hashCode();
063    }
064
065    public LiteWorkflowApp addNode(NodeDef node) throws WorkflowException {
066        ParamChecker.notNull(node, "node");
067        if (complete) {
068            throw new WorkflowException(ErrorCode.E0704, name);
069        }
070        if (nodesMap.containsKey(node.getName())) {
071            throw new WorkflowException(ErrorCode.E0705, node.getName());
072        }
073        if (node.getTransitions().contains(node.getName())) {
074            throw new WorkflowException(ErrorCode.E0706, node.getName());
075        }
076        nodesMap.put(node.getName(), node);
077        if (node instanceof EndNodeDef) {
078            complete = true;
079        }
080        return this;
081    }
082
083    public String getName() {
084        return name;
085    }
086
087    public String getDefinition() {
088        return definition;
089    }
090
091    public Collection<NodeDef> getNodeDefs() {
092        return Collections.unmodifiableCollection(nodesMap.values());
093    }
094
095    public NodeDef getNode(String name) {
096        return nodesMap.get(name);
097    }
098
099    public void validateWorkflowIntegrity() {
100        //TODO traverse wf, ensure there are not cycles, no open paths, and one END
101    }
102
103    public void validateTransition(String name, String transition) {
104        ParamChecker.notEmpty(name, "name");
105        ParamChecker.notEmpty(transition, "transition");
106        NodeDef node = getNode(name);
107        if (!node.getTransitions().contains(transition)) {
108            throw new IllegalArgumentException("invalid transition");
109        }
110    }
111
112
113    @Override
114    public void write(DataOutput dataOutput) throws IOException {
115        dataOutput.writeUTF(name);
116        // write out -1 as a marker to use StringSerializationUtil. Previously it was split to 20k long bits in a list.
117        dataOutput.writeInt(NEW_SERIALIZATION_METHOD_FLAG);
118        StringSerializationUtil.writeString(dataOutput, definition);
119        dataOutput.writeInt(nodesMap.size());
120        for (NodeDef n : getNodeDefs()) {
121            dataOutput.writeUTF(n.getClass().getName());
122            n.write(dataOutput);
123        }
124    }
125
126    @Override
127    public void readFields(DataInput dataInput) throws IOException {
128        name = dataInput.readUTF();
129        //read the full definition back
130        int definitionListFlag = dataInput.readInt();
131        if(definitionListFlag > NEW_SERIALIZATION_METHOD_FLAG) {
132            // negative number marking the usage of StringSerializationUtil
133            // positive number is the length of the array the String was broken into.
134            StringBuilder sb = new StringBuilder();
135            for (int i = 0; i < definitionListFlag; i++) {
136                sb.append(dataInput.readUTF());
137            }
138            definition = sb.toString();
139        } else {
140            definition = StringSerializationUtil.readString(dataInput);
141        }
142        int numNodes = dataInput.readInt();
143        for (int x = 0; x < numNodes; x++) {
144            try {
145                String nodeDefClass = dataInput.readUTF();
146                NodeDef node = (NodeDef) ReflectionUtils.newInstance(Class.forName(nodeDefClass), null);
147                node.readFields(dataInput);
148                addNode(node);
149            }
150            catch (WorkflowException ex) {
151                throw new IOException(ex);
152            }
153            catch (ClassNotFoundException e) {
154                throw new IOException(e);
155            }
156        }
157    }
158
159}