001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.workflow.lite; 020 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.List; 024 025import org.apache.oozie.ErrorCode; 026import org.apache.oozie.workflow.WorkflowException; 027 028/** 029 * Node definition for JOIN control node. 030 */ 031public class JoinNodeDef extends ControlNodeDef { 032 033 JoinNodeDef() { 034 } 035 036 public JoinNodeDef(String name, Class<? extends ControlNodeHandler> klass, String transition) { 037 super(name, "", klass, Arrays.asList(transition)); 038 } 039 040 public static class JoinNodeHandler extends NodeHandler { 041 042 public void loopDetection(Context context) throws WorkflowException { 043 String flag = getLoopFlag(context.getNodeDef().getName()); 044 if (context.getVar(flag) != null) { 045 throw new WorkflowException(ErrorCode.E0709, context.getNodeDef().getName()); 046 } 047 String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath()); 048 String forkCount = context.getVar(ControlNodeHandler.FORK_COUNT_PREFIX + parentExecutionPath); 049 if (forkCount == null) { 050 throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName()); 051 } 052 int count = Integer.parseInt(forkCount) - 1; 053 if (count == 0) { 054 context.setVar(flag, "true"); 055 } 056 } 057 058 public boolean enter(Context context) throws WorkflowException { 059 String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath()); 060 String forkCount = context.getVar(ControlNodeHandler.FORK_COUNT_PREFIX + parentExecutionPath); 061 if (forkCount == null) { 062 throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName()); 063 } 064 int count = Integer.parseInt(forkCount) - 1; 065 if (count > 0) { 066 context.setVar(ControlNodeHandler.FORK_COUNT_PREFIX + parentExecutionPath, "" + count); 067 context.deleteExecutionPath(); 068 } 069 else { 070 context.setVar(ControlNodeHandler.FORK_COUNT_PREFIX + parentExecutionPath, null); 071 } 072 return (count == 0); 073 } 074 075 public List<String> multiExit(Context context) { 076 String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath()); 077 // NOW we delete.. 078 context.deleteExecutionPath(); 079 080 String transition = context.getNodeDef().getTransitions().get(0); 081 String fullTransition = context.createFullTransition(parentExecutionPath, transition); 082 List<String> transitions = new ArrayList<String>(1); 083 transitions.add(fullTransition); 084 return transitions; 085 } 086 087 public String exit(Context context) { 088 throw new UnsupportedOperationException(); 089 } 090 091 public void kill(Context context) { 092 } 093 094 public void fail(Context context) { 095 } 096 } 097}