001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.coord.input.dependency; 020 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.IOException; 024import java.io.ObjectInputStream; 025import java.io.ObjectOutputStream; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031 032import org.apache.commons.lang.StringUtils; 033import org.apache.oozie.command.coord.CoordCommandUtils; 034import org.apache.oozie.coord.CoordELFunctions; 035import org.apache.oozie.util.WritableUtils; 036 037public class CoordPullInputDependency extends AbstractCoordInputDependency { 038 private Map<String, CoordUnResolvedInputDependency> unResolvedList = new HashMap<String, CoordUnResolvedInputDependency>(); 039 040 public CoordPullInputDependency() { 041 super(); 042 043 } 044 045 public void addResolvedList(String dataSet, String list) { 046 unResolvedList.get(dataSet).addResolvedList(Arrays.asList(list.split(","))); 047 } 048 049 public CoordUnResolvedInputDependency getUnResolvedDependency(String dataSet) { 050 return unResolvedList.get(dataSet); 051 } 052 053 public boolean isUnResolvedDependencyMet() { 054 for (CoordUnResolvedInputDependency coordUnResolvedDependency : unResolvedList.values()) { 055 if (!coordUnResolvedDependency.isResolved()) { 056 return false; 057 } 058 } 059 return true; 060 } 061 062 public void addUnResolvedList(String dataSet, String dependency) { 063 unResolvedList.put(dataSet, new CoordUnResolvedInputDependency(Arrays.asList(dependency.split("#")))); 064 } 065 066 public String getMissingDependencies() { 067 StringBuffer bf = new StringBuffer(super.getMissingDependencies()); 068 String unresolvedMissingDependencies = getUnresolvedMissingDependencies(); 069 if (!StringUtils.isEmpty(unresolvedMissingDependencies)) { 070 bf.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR); 071 bf.append(unresolvedMissingDependencies); 072 } 073 return bf.toString(); 074 } 075 076 public String getUnresolvedMissingDependencies() { 077 StringBuffer bf = new StringBuffer(); 078 if (unResolvedList != null) { 079 for (CoordUnResolvedInputDependency coordUnResolvedDependency : unResolvedList.values()) { 080 if (!coordUnResolvedDependency.isResolved()) { 081 String unresolvedList = coordUnResolvedDependency.getUnResolvedList(); 082 if (bf.length() > 0 && !unresolvedList.isEmpty()) { 083 bf.append(CoordELFunctions.INSTANCE_SEPARATOR); 084 } 085 bf.append(unresolvedList); 086 } 087 } 088 } 089 return bf.toString(); 090 } 091 092 protected void generateDependencies() { 093 super.generateDependencies(); 094 } 095 096 private void writeObject(ObjectOutputStream os) throws IOException, ClassNotFoundException { 097 os.writeObject(unResolvedList); 098 } 099 100 @SuppressWarnings("unchecked") 101 private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { 102 unResolvedList = (Map<String, CoordUnResolvedInputDependency>) in.readObject(); 103 generateDependencies(); 104 } 105 106 public boolean isDependencyMet() { 107 return isResolvedDependencyMeet() && isUnResolvedDependencyMet(); 108 109 } 110 111 public boolean isResolvedDependencyMeet() { 112 return super.isDependencyMet(); 113 114 } 115 116 @Override 117 public void write(DataOutput out) throws IOException { 118 super.write(out); 119 WritableUtils.writeMap(out, unResolvedList); 120 } 121 122 @Override 123 public void readFields(DataInput in) throws IOException { 124 super.readFields(in); 125 unResolvedList = WritableUtils.readMap(in, CoordUnResolvedInputDependency.class); 126 } 127 128 @Override 129 public void setMissingDependencies(String join) { 130 // We don't have to set this for input logic. Dependency map will have computed missing dependencies 131 } 132 133 @Override 134 public List<String> getAvailableDependencies(String dataSet) { 135 List<String> availableList = new ArrayList<String>(); 136 availableList.addAll(super.getAvailableDependencies(dataSet)); 137 if (getUnResolvedDependency(dataSet) != null) { 138 availableList.addAll(getUnResolvedDependency(dataSet).getResolvedList()); 139 } 140 return availableList; 141 } 142 143 public boolean isDataSetResolved(String dataSet) { 144 if(unResolvedList.containsKey(dataSet)){ 145 return unResolvedList.get(dataSet).isResolved(); 146 } 147 else{ 148 return super.isDataSetResolved(dataSet); 149 } 150 } 151}