001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.workflow.lite; 019 020 import java.util.ArrayList; 021 import java.util.Arrays; 022 import java.util.List; 023 024 import org.apache.oozie.ErrorCode; 025 import org.apache.oozie.workflow.WorkflowException; 026 027 /** 028 * Node definition for JOIN control node. 029 */ 030 public class JoinNodeDef extends ControlNodeDef { 031 032 JoinNodeDef() { 033 } 034 035 public JoinNodeDef(String name, Class<? extends ControlNodeHandler> klass, String transition) { 036 super(name, "", klass, Arrays.asList(transition)); 037 } 038 039 public static class JoinNodeHandler extends NodeHandler { 040 041 public void loopDetection(Context context) throws WorkflowException { 042 String flag = getLoopFlag(context.getNodeDef().getName()); 043 if (context.getVar(flag) != null) { 044 throw new WorkflowException(ErrorCode.E0709, context.getNodeDef().getName()); 045 } 046 String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath()); 047 String forkCount = context.getVar(ControlNodeHandler.FORK_COUNT_PREFIX + parentExecutionPath); 048 if (forkCount == null) { 049 throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName()); 050 } 051 int count = Integer.parseInt(forkCount) - 1; 052 if (count == 0) { 053 context.setVar(flag, "true"); 054 } 055 } 056 057 public boolean enter(Context context) throws WorkflowException { 058 String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath()); 059 String forkCount = context.getVar(ControlNodeHandler.FORK_COUNT_PREFIX + parentExecutionPath); 060 if (forkCount == null) { 061 throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName()); 062 } 063 int count = Integer.parseInt(forkCount) - 1; 064 if (count > 0) { 065 context.setVar(ControlNodeHandler.FORK_COUNT_PREFIX + parentExecutionPath, "" + count); 066 context.deleteExecutionPath(); 067 } 068 else { 069 context.setVar(ControlNodeHandler.FORK_COUNT_PREFIX + parentExecutionPath, null); 070 } 071 return (count == 0); 072 } 073 074 public List<String> multiExit(Context context) { 075 String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath()); 076 // NOW we delete.. 077 context.deleteExecutionPath(); 078 079 String transition = context.getNodeDef().getTransitions().get(0); 080 String fullTransition = context.createFullTransition(parentExecutionPath, transition); 081 List<String> transitions = new ArrayList<String>(1); 082 transitions.add(fullTransition); 083 return transitions; 084 } 085 086 public String exit(Context context) { 087 throw new UnsupportedOperationException(); 088 } 089 090 public void kill(Context context) { 091 } 092 093 public void fail(Context context) { 094 } 095 } 096 }