001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.coord.input.dependency;
020
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.IOException;
024import java.io.ObjectInputStream;
025import java.io.ObjectOutputStream;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031
032import org.apache.commons.lang.StringUtils;
033import org.apache.oozie.command.coord.CoordCommandUtils;
034import org.apache.oozie.coord.CoordELFunctions;
035import org.apache.oozie.util.WritableUtils;
036
037public class CoordPullInputDependency extends AbstractCoordInputDependency {
038    private Map<String, CoordUnResolvedInputDependency> unResolvedList = new HashMap<String, CoordUnResolvedInputDependency>();
039
040    public CoordPullInputDependency() {
041        super();
042
043    }
044
045    public void addResolvedList(String dataSet, String list) {
046        unResolvedList.get(dataSet).addResolvedList(Arrays.asList(list.split(",")));
047    }
048
049    public CoordUnResolvedInputDependency getUnResolvedDependency(String dataSet) {
050        return unResolvedList.get(dataSet);
051    }
052
053    public boolean isUnResolvedDependencyMet() {
054        for (CoordUnResolvedInputDependency coordUnResolvedDependency : unResolvedList.values()) {
055            if (!coordUnResolvedDependency.isResolved()) {
056                return false;
057            }
058        }
059        return true;
060    }
061
062    public void addUnResolvedList(String dataSet, String dependency) {
063        unResolvedList.put(dataSet, new CoordUnResolvedInputDependency(Arrays.asList(dependency.split("#"))));
064    }
065
066    public String getMissingDependencies() {
067        StringBuffer bf = new StringBuffer(super.getMissingDependencies());
068        String unresolvedMissingDependencies = getUnresolvedMissingDependencies();
069        if (!StringUtils.isEmpty(unresolvedMissingDependencies)) {
070            bf.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR);
071            bf.append(unresolvedMissingDependencies);
072        }
073        return bf.toString();
074    }
075
076    public String getUnresolvedMissingDependencies() {
077        StringBuffer bf = new StringBuffer();
078        if (unResolvedList != null) {
079            for (CoordUnResolvedInputDependency coordUnResolvedDependency : unResolvedList.values()) {
080                if (!coordUnResolvedDependency.isResolved()) {
081                    String unresolvedList = coordUnResolvedDependency.getUnResolvedList();
082                    if (bf.length() > 0 && !unresolvedList.isEmpty()) {
083                        bf.append(CoordELFunctions.INSTANCE_SEPARATOR);
084                    }
085                    bf.append(unresolvedList);
086                }
087            }
088        }
089        return bf.toString();
090    }
091
092    protected void generateDependencies() {
093        super.generateDependencies();
094    }
095
096    private void writeObject(ObjectOutputStream os) throws IOException, ClassNotFoundException {
097        os.writeObject(unResolvedList);
098    }
099
100    @SuppressWarnings("unchecked")
101    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
102        unResolvedList = (Map<String, CoordUnResolvedInputDependency>) in.readObject();
103        generateDependencies();
104    }
105
106    public boolean isDependencyMet() {
107        return isResolvedDependencyMeet() && isUnResolvedDependencyMet();
108
109    }
110
111    public boolean isResolvedDependencyMeet() {
112        return super.isDependencyMet();
113
114    }
115
116    @Override
117    public void write(DataOutput out) throws IOException {
118        super.write(out);
119        WritableUtils.writeMap(out, unResolvedList);
120    }
121
122    @Override
123    public void readFields(DataInput in) throws IOException {
124        super.readFields(in);
125        unResolvedList = WritableUtils.readMap(in, CoordUnResolvedInputDependency.class);
126    }
127
128    @Override
129    public void setMissingDependencies(String join) {
130        // We don't have to set this for input logic. Dependency map will have computed missing dependencies
131    }
132
133    @Override
134    public List<String> getAvailableDependencies(String dataSet) {
135        List<String> availableList = new ArrayList<String>();
136        availableList.addAll(super.getAvailableDependencies(dataSet));
137        if (getUnResolvedDependency(dataSet) != null) {
138            availableList.addAll(getUnResolvedDependency(dataSet).getResolvedList());
139        }
140        return availableList;
141    }
142
143    public boolean isDataSetResolved(String dataSet) {
144        if(unResolvedList.containsKey(dataSet)){
145            return unResolvedList.get(dataSet).isResolved();
146        }
147        else{
148            return super.isDataSetResolved(dataSet);
149        }
150    }
151}