001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.workflow.lite;
019
020import org.apache.oozie.ErrorCode;
021import org.apache.oozie.workflow.WorkflowException;
022
023import java.util.ArrayList;
024import java.util.List;
025
026
027/**
028 * Node handler that provides the necessary workflow logic for control nodes: START/END/KILL/FORK/JOIN.
029 */
030public abstract class ControlNodeHandler extends NodeHandler {
031
032    public static final String FORK_COUNT_PREFIX = "workflow.fork.";
033
034    /**
035     * Called by {@link #enter(Context)} when returning TRUE.
036     *
037     * @param context workflow context
038     * @throws WorkflowException thrown if an error occurred.
039     */
040    public abstract void touch(Context context) throws WorkflowException;
041
042    @Override
043    public boolean enter(Context context) throws WorkflowException {
044        boolean doTouch;
045        Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
046        if (nodeClass.equals(StartNodeDef.class)) {
047            if (!context.getSignalValue().equals(StartNodeDef.START)) {
048                throw new WorkflowException(ErrorCode.E0715, context.getSignalValue());
049            }
050            doTouch = true;
051        }
052        else if (nodeClass.equals(EndNodeDef.class)) {
053            doTouch = true;
054        }
055        else if (nodeClass.equals(KillNodeDef.class)) {
056            doTouch = true;
057        }
058        else if (nodeClass.equals(ForkNodeDef.class)) {
059            doTouch = true;
060        }
061        else if (nodeClass.equals(JoinNodeDef.class)) {
062            String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
063            String forkCount = context.getVar(FORK_COUNT_PREFIX + parentExecutionPath);
064            if (forkCount == null) {
065                throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName());
066            }
067            int count = Integer.parseInt(forkCount) - 1;
068            if (count > 0) {
069                context.setVar(FORK_COUNT_PREFIX + parentExecutionPath, "" + count);
070                context.deleteExecutionPath();
071            }
072            else {
073                context.setVar(FORK_COUNT_PREFIX + parentExecutionPath, null);
074            }
075            doTouch = (count == 0);
076        }
077        else {
078            throw new IllegalStateException("Invalid node type: " + nodeClass);
079        }
080        if (doTouch) {
081            touch(context);
082        }
083        return false;
084    }
085
086    @Override
087    public String exit(Context context) throws WorkflowException {
088        Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
089        if (nodeClass.equals(StartNodeDef.class)) {
090            return context.getNodeDef().getTransitions().get(0);
091        }
092        else if (nodeClass.equals(EndNodeDef.class)) {
093            context.completeJob();
094            return null;
095        }
096        else if (nodeClass.equals(KillNodeDef.class)) {
097            context.killJob();
098            return null;
099        }
100        else if (nodeClass.equals(ForkNodeDef.class)) {
101            throw new UnsupportedOperationException();
102        }
103        else if (nodeClass.equals(JoinNodeDef.class)) {
104            throw new UnsupportedOperationException();
105        }
106        else {
107            throw new IllegalStateException("Invalid node type: " + nodeClass);
108        }
109    }
110
111    @Override
112    public void loopDetection(Context context)
113        throws WorkflowException {
114        Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
115        if (nodeClass.equals(StartNodeDef.class)) {
116        }
117        else if (nodeClass.equals(EndNodeDef.class)) {
118        }
119        else if (nodeClass.equals(KillNodeDef.class)) {
120        }
121        else if (nodeClass.equals(ForkNodeDef.class)) {
122        }
123        else if (nodeClass.equals(JoinNodeDef.class)) {
124            String flag = getLoopFlag(context.getNodeDef().getName());
125            if (context.getVar(flag) != null) {
126                throw new WorkflowException(ErrorCode.E0709, context.getNodeDef().getName());
127            }
128            String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
129            String forkCount = context.getVar(FORK_COUNT_PREFIX + parentExecutionPath);
130            if (forkCount == null) {
131                throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName());
132            }
133            int count = Integer.parseInt(forkCount) - 1;
134            if (count == 0) {
135                context.setVar(flag, "true");
136            }
137
138        }
139        else {
140            throw new IllegalStateException("Invalid node type: " + nodeClass);
141        }
142    }
143
144    @Override
145    public List<String> multiExit(Context context)
146        throws WorkflowException {
147        Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
148        if (nodeClass.equals(StartNodeDef.class)) {
149            return super.multiExit(context);
150        }
151        else if (nodeClass.equals(EndNodeDef.class)) {
152            return super.multiExit(context);
153        }
154        else if (nodeClass.equals(KillNodeDef.class)) {
155            return super.multiExit(context);
156        }
157        else if (nodeClass.equals(ForkNodeDef.class)) {
158            List<String> transitions = context.getNodeDef().getTransitions();
159            context.setVar(FORK_COUNT_PREFIX + context.getExecutionPath(), "" + transitions.size());
160
161            List<String> fullTransitions = new ArrayList<String>(transitions.size());
162
163            for (String transition : transitions) {
164                String childExecutionPath = context.createExecutionPath(transition);
165                String fullTransition = context.createFullTransition(childExecutionPath, transition);
166                fullTransitions.add(fullTransition);
167            }
168            return fullTransitions;
169        }
170        else if (nodeClass.equals(JoinNodeDef.class)) {
171            String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
172            // NOW we delete..
173            context.deleteExecutionPath();
174
175            String transition = context.getNodeDef().getTransitions().get(0);
176            String fullTransition = context.createFullTransition(parentExecutionPath, transition);
177            List<String> transitions = new ArrayList<String>(1);
178            transitions.add(fullTransition);
179            return transitions;
180        }
181        else {
182            throw new IllegalStateException("Invalid node type: " + nodeClass);
183        }
184    }
185
186    @Override
187    public void kill(Context context) {
188        Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
189        if (nodeClass.equals(StartNodeDef.class)) {
190            //NOP
191        }
192        else if (nodeClass.equals(EndNodeDef.class)) {
193            //NOP
194        }
195        else if (nodeClass.equals(KillNodeDef.class)) {
196            //NOP
197        }
198        else if (nodeClass.equals(ForkNodeDef.class)) {
199            //NOP
200        }
201        else if (nodeClass.equals(JoinNodeDef.class)) {
202            //NOP
203        }
204        else {
205            throw new IllegalStateException("Invalid node type: " + nodeClass);
206        }
207    }
208
209    @Override
210    public void fail(Context context) {
211        Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
212        if (nodeClass.equals(StartNodeDef.class)) {
213            //NOP
214        }
215        else if (nodeClass.equals(EndNodeDef.class)) {
216            //NOP
217        }
218        else if (nodeClass.equals(KillNodeDef.class)) {
219            //NOP
220        }
221        else if (nodeClass.equals(ForkNodeDef.class)) {
222            //NOP
223        }
224        else if (nodeClass.equals(JoinNodeDef.class)) {
225            //NOP
226        }
227        else {
228            throw new IllegalStateException("Invalid node type: " + nodeClass);
229        }
230    }
231}