001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.workflow.lite;
020
021import org.apache.oozie.ErrorCode;
022import org.apache.oozie.util.XLog;
023import org.apache.oozie.workflow.WorkflowException;
024
025import java.util.ArrayList;
026import java.util.List;
027
028
029/**
030 * Node handler that provides the necessary workflow logic for control nodes: START/END/KILL/FORK/JOIN.
031 */
032public abstract class ControlNodeHandler extends NodeHandler {
033
034    public static final String FORK_COUNT_PREFIX = "workflow.fork.";
035    public XLog LOG = XLog.getLog(getClass());
036
037
038    /**
039     * Called by {@link #enter(Context)} when returning TRUE.
040     *
041     * @param context workflow context
042     * @throws WorkflowException thrown if an error occurred.
043     */
044    public abstract void touch(Context context) throws WorkflowException;
045
046    @Override
047    public boolean enter(Context context) throws WorkflowException {
048        boolean doTouch;
049        Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
050        if (nodeClass.equals(StartNodeDef.class)) {
051            if (!context.getSignalValue().equals(StartNodeDef.START)) {
052                throw new WorkflowException(ErrorCode.E0715, context.getSignalValue());
053            }
054            doTouch = true;
055        }
056        else if (nodeClass.equals(EndNodeDef.class)) {
057            doTouch = true;
058        }
059        else if (nodeClass.equals(KillNodeDef.class)) {
060            doTouch = true;
061        }
062        else if (nodeClass.equals(ForkNodeDef.class)) {
063            doTouch = true;
064        }
065        else if (nodeClass.equals(JoinNodeDef.class)) {
066            String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
067            String forkCount = context.getVar(FORK_COUNT_PREFIX + parentExecutionPath);
068
069            if (forkCount == null) {
070                throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName());
071            }
072            int count = Integer.parseInt(forkCount) - 1;
073            if (count > 0) {
074                context.setVar(FORK_COUNT_PREFIX + parentExecutionPath, "" + count);
075                context.deleteExecutionPath();
076            }
077            else {
078                context.setVar(FORK_COUNT_PREFIX + parentExecutionPath, null);
079            }
080            LOG.debug("count = " + count + " for parent execution path " + parentExecutionPath);
081
082            doTouch = (count == 0);
083        }
084        else {
085            throw new IllegalStateException("Invalid node type: " + nodeClass);
086        }
087        if (doTouch) {
088            touch(context);
089        }
090        return false;
091    }
092
093    @Override
094    public String exit(Context context) throws WorkflowException {
095        Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
096        if (nodeClass.equals(StartNodeDef.class)) {
097            return context.getNodeDef().getTransitions().get(0);
098        }
099        else if (nodeClass.equals(EndNodeDef.class)) {
100            context.completeJob();
101            return null;
102        }
103        else if (nodeClass.equals(KillNodeDef.class)) {
104            context.killJob();
105            return null;
106        }
107        else if (nodeClass.equals(ForkNodeDef.class)) {
108            throw new UnsupportedOperationException();
109        }
110        else if (nodeClass.equals(JoinNodeDef.class)) {
111            throw new UnsupportedOperationException();
112        }
113        else {
114            throw new IllegalStateException("Invalid node type: " + nodeClass);
115        }
116    }
117
118    @Override
119    public void loopDetection(Context context)
120        throws WorkflowException {
121        Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
122        if (nodeClass.equals(StartNodeDef.class)) {
123        }
124        else if (nodeClass.equals(EndNodeDef.class)) {
125        }
126        else if (nodeClass.equals(KillNodeDef.class)) {
127        }
128        else if (nodeClass.equals(ForkNodeDef.class)) {
129        }
130        else if (nodeClass.equals(JoinNodeDef.class)) {
131            String flag = getLoopFlag(context.getNodeDef().getName());
132            if (context.getVar(flag) != null) {
133                throw new WorkflowException(ErrorCode.E0709, context.getNodeDef().getName());
134            }
135            String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
136            String forkCount = context.getVar(FORK_COUNT_PREFIX + parentExecutionPath);
137            if (forkCount == null) {
138                throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName());
139            }
140            int count = Integer.parseInt(forkCount) - 1;
141            if (count == 0) {
142                context.setVar(flag, "true");
143            }
144
145        }
146        else {
147            throw new IllegalStateException("Invalid node type: " + nodeClass);
148        }
149    }
150
151    @Override
152    public List<String> multiExit(Context context)
153        throws WorkflowException {
154        Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
155        if (nodeClass.equals(StartNodeDef.class)) {
156            return super.multiExit(context);
157        }
158        else if (nodeClass.equals(EndNodeDef.class)) {
159            return super.multiExit(context);
160        }
161        else if (nodeClass.equals(KillNodeDef.class)) {
162            return super.multiExit(context);
163        }
164        else if (nodeClass.equals(ForkNodeDef.class)) {
165            List<String> transitions = context.getNodeDef().getTransitions();
166            context.setVar(FORK_COUNT_PREFIX + context.getExecutionPath(), "" + transitions.size());
167
168            List<String> fullTransitions = new ArrayList<String>(transitions.size());
169
170            for (String transition : transitions) {
171                String childExecutionPath = context.createExecutionPath(transition);
172                String fullTransition = context.createFullTransition(childExecutionPath, transition);
173                fullTransitions.add(fullTransition);
174            }
175            return fullTransitions;
176        }
177        else if (nodeClass.equals(JoinNodeDef.class)) {
178            String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
179            // NOW we delete..
180            context.deleteExecutionPath();
181
182            String transition = context.getNodeDef().getTransitions().get(0);
183            String fullTransition = context.createFullTransition(parentExecutionPath, transition);
184            List<String> transitions = new ArrayList<String>(1);
185            transitions.add(fullTransition);
186            return transitions;
187        }
188        else {
189            throw new IllegalStateException("Invalid node type: " + nodeClass);
190        }
191    }
192
193    @Override
194    public void kill(Context context) {
195        Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
196        if (nodeClass.equals(StartNodeDef.class)) {
197            //NOP
198        }
199        else if (nodeClass.equals(EndNodeDef.class)) {
200            //NOP
201        }
202        else if (nodeClass.equals(KillNodeDef.class)) {
203            //NOP
204        }
205        else if (nodeClass.equals(ForkNodeDef.class)) {
206            //NOP
207        }
208        else if (nodeClass.equals(JoinNodeDef.class)) {
209            //NOP
210        }
211        else {
212            throw new IllegalStateException("Invalid node type: " + nodeClass);
213        }
214    }
215
216    @Override
217    public void fail(Context context) {
218        Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
219        if (nodeClass.equals(StartNodeDef.class)) {
220            //NOP
221        }
222        else if (nodeClass.equals(EndNodeDef.class)) {
223            //NOP
224        }
225        else if (nodeClass.equals(KillNodeDef.class)) {
226            //NOP
227        }
228        else if (nodeClass.equals(ForkNodeDef.class)) {
229            //NOP
230        }
231        else if (nodeClass.equals(JoinNodeDef.class)) {
232            //NOP
233        }
234        else {
235            throw new IllegalStateException("Invalid node type: " + nodeClass);
236        }
237    }
238}