001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.workflow.lite; 019 020 import org.apache.oozie.ErrorCode; 021 import org.apache.oozie.workflow.WorkflowException; 022 023 import java.util.ArrayList; 024 import java.util.List; 025 026 027 /** 028 * Node handler that provides the necessary workflow logic for control nodes: START/END/KILL/FORK/JOIN. 029 */ 030 public abstract class ControlNodeHandler extends NodeHandler { 031 032 public static final String FORK_COUNT_PREFIX = "workflow.fork."; 033 034 /** 035 * Called by {@link #enter(Context)} when returning TRUE. 036 * 037 * @param context workflow context 038 * @throws WorkflowException thrown if an error occurred. 039 */ 040 public abstract void touch(Context context) throws WorkflowException; 041 042 @Override 043 public boolean enter(Context context) throws WorkflowException { 044 boolean doTouch; 045 Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass(); 046 if (nodeClass.equals(StartNodeDef.class)) { 047 if (!context.getSignalValue().equals(StartNodeDef.START)) { 048 throw new WorkflowException(ErrorCode.E0715, context.getSignalValue()); 049 } 050 doTouch = true; 051 } 052 else if (nodeClass.equals(EndNodeDef.class)) { 053 doTouch = true; 054 } 055 else if (nodeClass.equals(KillNodeDef.class)) { 056 doTouch = true; 057 } 058 else if (nodeClass.equals(ForkNodeDef.class)) { 059 doTouch = true; 060 } 061 else if (nodeClass.equals(JoinNodeDef.class)) { 062 String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath()); 063 String forkCount = context.getVar(FORK_COUNT_PREFIX + parentExecutionPath); 064 if (forkCount == null) { 065 throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName()); 066 } 067 int count = Integer.parseInt(forkCount) - 1; 068 if (count > 0) { 069 context.setVar(FORK_COUNT_PREFIX + parentExecutionPath, "" + count); 070 context.deleteExecutionPath(); 071 } 072 else { 073 context.setVar(FORK_COUNT_PREFIX + parentExecutionPath, null); 074 } 075 doTouch = (count == 0); 076 } 077 else { 078 throw new IllegalStateException("Invalid node type: " + nodeClass); 079 } 080 if (doTouch) { 081 touch(context); 082 } 083 return false; 084 } 085 086 @Override 087 public String exit(Context context) throws WorkflowException { 088 Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass(); 089 if (nodeClass.equals(StartNodeDef.class)) { 090 return context.getNodeDef().getTransitions().get(0); 091 } 092 else if (nodeClass.equals(EndNodeDef.class)) { 093 context.completeJob(); 094 return null; 095 } 096 else if (nodeClass.equals(KillNodeDef.class)) { 097 context.killJob(); 098 return null; 099 } 100 else if (nodeClass.equals(ForkNodeDef.class)) { 101 throw new UnsupportedOperationException(); 102 } 103 else if (nodeClass.equals(JoinNodeDef.class)) { 104 throw new UnsupportedOperationException(); 105 } 106 else { 107 throw new IllegalStateException("Invalid node type: " + nodeClass); 108 } 109 } 110 111 @Override 112 public void loopDetection(Context context) 113 throws WorkflowException { 114 Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass(); 115 if (nodeClass.equals(StartNodeDef.class)) { 116 } 117 else if (nodeClass.equals(EndNodeDef.class)) { 118 } 119 else if (nodeClass.equals(KillNodeDef.class)) { 120 } 121 else if (nodeClass.equals(ForkNodeDef.class)) { 122 } 123 else if (nodeClass.equals(JoinNodeDef.class)) { 124 String flag = getLoopFlag(context.getNodeDef().getName()); 125 if (context.getVar(flag) != null) { 126 throw new WorkflowException(ErrorCode.E0709, context.getNodeDef().getName()); 127 } 128 String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath()); 129 String forkCount = context.getVar(FORK_COUNT_PREFIX + parentExecutionPath); 130 if (forkCount == null) { 131 throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName()); 132 } 133 int count = Integer.parseInt(forkCount) - 1; 134 if (count == 0) { 135 context.setVar(flag, "true"); 136 } 137 138 } 139 else { 140 throw new IllegalStateException("Invalid node type: " + nodeClass); 141 } 142 } 143 144 @Override 145 public List<String> multiExit(Context context) 146 throws WorkflowException { 147 Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass(); 148 if (nodeClass.equals(StartNodeDef.class)) { 149 return super.multiExit(context); 150 } 151 else if (nodeClass.equals(EndNodeDef.class)) { 152 return super.multiExit(context); 153 } 154 else if (nodeClass.equals(KillNodeDef.class)) { 155 return super.multiExit(context); 156 } 157 else if (nodeClass.equals(ForkNodeDef.class)) { 158 List<String> transitions = context.getNodeDef().getTransitions(); 159 context.setVar(FORK_COUNT_PREFIX + context.getExecutionPath(), "" + transitions.size()); 160 161 List<String> fullTransitions = new ArrayList<String>(transitions.size()); 162 163 for (String transition : transitions) { 164 String childExecutionPath = context.createExecutionPath(transition); 165 String fullTransition = context.createFullTransition(childExecutionPath, transition); 166 fullTransitions.add(fullTransition); 167 } 168 return fullTransitions; 169 } 170 else if (nodeClass.equals(JoinNodeDef.class)) { 171 String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath()); 172 // NOW we delete.. 173 context.deleteExecutionPath(); 174 175 String transition = context.getNodeDef().getTransitions().get(0); 176 String fullTransition = context.createFullTransition(parentExecutionPath, transition); 177 List<String> transitions = new ArrayList<String>(1); 178 transitions.add(fullTransition); 179 return transitions; 180 } 181 else { 182 throw new IllegalStateException("Invalid node type: " + nodeClass); 183 } 184 } 185 186 @Override 187 public void kill(Context context) { 188 Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass(); 189 if (nodeClass.equals(StartNodeDef.class)) { 190 //NOP 191 } 192 else if (nodeClass.equals(EndNodeDef.class)) { 193 //NOP 194 } 195 else if (nodeClass.equals(KillNodeDef.class)) { 196 //NOP 197 } 198 else if (nodeClass.equals(ForkNodeDef.class)) { 199 //NOP 200 } 201 else if (nodeClass.equals(JoinNodeDef.class)) { 202 //NOP 203 } 204 else { 205 throw new IllegalStateException("Invalid node type: " + nodeClass); 206 } 207 } 208 209 @Override 210 public void fail(Context context) { 211 Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass(); 212 if (nodeClass.equals(StartNodeDef.class)) { 213 //NOP 214 } 215 else if (nodeClass.equals(EndNodeDef.class)) { 216 //NOP 217 } 218 else if (nodeClass.equals(KillNodeDef.class)) { 219 //NOP 220 } 221 else if (nodeClass.equals(ForkNodeDef.class)) { 222 //NOP 223 } 224 else if (nodeClass.equals(JoinNodeDef.class)) { 225 //NOP 226 } 227 else { 228 throw new IllegalStateException("Invalid node type: " + nodeClass); 229 } 230 } 231 }