001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.workflow.lite; 020 021import org.apache.oozie.ErrorCode; 022import org.apache.oozie.util.XLog; 023import org.apache.oozie.workflow.WorkflowException; 024 025import java.util.ArrayList; 026import java.util.List; 027 028 029/** 030 * Node handler that provides the necessary workflow logic for control nodes: START/END/KILL/FORK/JOIN. 031 */ 032public abstract class ControlNodeHandler extends NodeHandler { 033 034 public static final String FORK_COUNT_PREFIX = "workflow.fork."; 035 public XLog LOG = XLog.getLog(getClass()); 036 037 038 /** 039 * Called by {@link #enter(Context)} when returning TRUE. 040 * 041 * @param context workflow context 042 * @throws WorkflowException thrown if an error occurred. 043 */ 044 public abstract void touch(Context context) throws WorkflowException; 045 046 @Override 047 public boolean enter(Context context) throws WorkflowException { 048 boolean doTouch; 049 Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass(); 050 if (nodeClass.equals(StartNodeDef.class)) { 051 if (!context.getSignalValue().equals(StartNodeDef.START)) { 052 throw new WorkflowException(ErrorCode.E0715, context.getSignalValue()); 053 } 054 doTouch = true; 055 } 056 else if (nodeClass.equals(EndNodeDef.class)) { 057 doTouch = true; 058 } 059 else if (nodeClass.equals(KillNodeDef.class)) { 060 doTouch = true; 061 } 062 else if (nodeClass.equals(ForkNodeDef.class)) { 063 doTouch = true; 064 } 065 else if (nodeClass.equals(JoinNodeDef.class)) { 066 String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath()); 067 String forkCount = context.getVar(FORK_COUNT_PREFIX + parentExecutionPath); 068 069 if (forkCount == null) { 070 throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName()); 071 } 072 int count = Integer.parseInt(forkCount) - 1; 073 if (count > 0) { 074 context.setVar(FORK_COUNT_PREFIX + parentExecutionPath, "" + count); 075 context.deleteExecutionPath(); 076 } 077 else { 078 context.setVar(FORK_COUNT_PREFIX + parentExecutionPath, null); 079 } 080 LOG.debug("count = " + count + " for parent execution path " + parentExecutionPath); 081 082 doTouch = (count == 0); 083 } 084 else { 085 throw new IllegalStateException("Invalid node type: " + nodeClass); 086 } 087 if (doTouch) { 088 touch(context); 089 } 090 return false; 091 } 092 093 @Override 094 public String exit(Context context) throws WorkflowException { 095 Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass(); 096 if (nodeClass.equals(StartNodeDef.class)) { 097 return context.getNodeDef().getTransitions().get(0); 098 } 099 else if (nodeClass.equals(EndNodeDef.class)) { 100 context.completeJob(); 101 return null; 102 } 103 else if (nodeClass.equals(KillNodeDef.class)) { 104 context.killJob(); 105 return null; 106 } 107 else if (nodeClass.equals(ForkNodeDef.class)) { 108 throw new UnsupportedOperationException(); 109 } 110 else if (nodeClass.equals(JoinNodeDef.class)) { 111 throw new UnsupportedOperationException(); 112 } 113 else { 114 throw new IllegalStateException("Invalid node type: " + nodeClass); 115 } 116 } 117 118 @Override 119 public void loopDetection(Context context) 120 throws WorkflowException { 121 Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass(); 122 if (nodeClass.equals(StartNodeDef.class)) { 123 } 124 else if (nodeClass.equals(EndNodeDef.class)) { 125 } 126 else if (nodeClass.equals(KillNodeDef.class)) { 127 } 128 else if (nodeClass.equals(ForkNodeDef.class)) { 129 } 130 else if (nodeClass.equals(JoinNodeDef.class)) { 131 String flag = getLoopFlag(context.getNodeDef().getName()); 132 if (context.getVar(flag) != null) { 133 throw new WorkflowException(ErrorCode.E0709, context.getNodeDef().getName()); 134 } 135 String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath()); 136 String forkCount = context.getVar(FORK_COUNT_PREFIX + parentExecutionPath); 137 if (forkCount == null) { 138 throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName()); 139 } 140 int count = Integer.parseInt(forkCount) - 1; 141 if (count == 0) { 142 context.setVar(flag, "true"); 143 } 144 145 } 146 else { 147 throw new IllegalStateException("Invalid node type: " + nodeClass); 148 } 149 } 150 151 @Override 152 public List<String> multiExit(Context context) 153 throws WorkflowException { 154 Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass(); 155 if (nodeClass.equals(StartNodeDef.class)) { 156 return super.multiExit(context); 157 } 158 else if (nodeClass.equals(EndNodeDef.class)) { 159 return super.multiExit(context); 160 } 161 else if (nodeClass.equals(KillNodeDef.class)) { 162 return super.multiExit(context); 163 } 164 else if (nodeClass.equals(ForkNodeDef.class)) { 165 List<String> transitions = context.getNodeDef().getTransitions(); 166 context.setVar(FORK_COUNT_PREFIX + context.getExecutionPath(), "" + transitions.size()); 167 168 List<String> fullTransitions = new ArrayList<String>(transitions.size()); 169 170 for (String transition : transitions) { 171 String childExecutionPath = context.createExecutionPath(transition); 172 String fullTransition = context.createFullTransition(childExecutionPath, transition); 173 fullTransitions.add(fullTransition); 174 } 175 return fullTransitions; 176 } 177 else if (nodeClass.equals(JoinNodeDef.class)) { 178 String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath()); 179 // NOW we delete.. 180 context.deleteExecutionPath(); 181 182 String transition = context.getNodeDef().getTransitions().get(0); 183 String fullTransition = context.createFullTransition(parentExecutionPath, transition); 184 List<String> transitions = new ArrayList<String>(1); 185 transitions.add(fullTransition); 186 return transitions; 187 } 188 else { 189 throw new IllegalStateException("Invalid node type: " + nodeClass); 190 } 191 } 192 193 @Override 194 public void kill(Context context) { 195 Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass(); 196 if (nodeClass.equals(StartNodeDef.class)) { 197 //NOP 198 } 199 else if (nodeClass.equals(EndNodeDef.class)) { 200 //NOP 201 } 202 else if (nodeClass.equals(KillNodeDef.class)) { 203 //NOP 204 } 205 else if (nodeClass.equals(ForkNodeDef.class)) { 206 //NOP 207 } 208 else if (nodeClass.equals(JoinNodeDef.class)) { 209 //NOP 210 } 211 else { 212 throw new IllegalStateException("Invalid node type: " + nodeClass); 213 } 214 } 215 216 @Override 217 public void fail(Context context) { 218 Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass(); 219 if (nodeClass.equals(StartNodeDef.class)) { 220 //NOP 221 } 222 else if (nodeClass.equals(EndNodeDef.class)) { 223 //NOP 224 } 225 else if (nodeClass.equals(KillNodeDef.class)) { 226 //NOP 227 } 228 else if (nodeClass.equals(ForkNodeDef.class)) { 229 //NOP 230 } 231 else if (nodeClass.equals(JoinNodeDef.class)) { 232 //NOP 233 } 234 else { 235 throw new IllegalStateException("Invalid node type: " + nodeClass); 236 } 237 } 238}