001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.coord.input.dependency;
020
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Date;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.Set;
032
033import org.apache.commons.lang.StringUtils;
034import org.apache.hadoop.io.Writable;
035import org.apache.oozie.CoordinatorActionBean;
036import org.apache.oozie.command.CommandException;
037import org.apache.oozie.command.coord.CoordCommandUtils;
038import org.apache.oozie.coord.CoordELFunctions;
039import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil;
040import org.apache.oozie.dependency.ActionDependency;
041import org.apache.oozie.util.DateUtils;
042import org.apache.oozie.util.WritableUtils;
043import org.jdom.Element;
044import org.jdom.JDOMException;
045
046public abstract class AbstractCoordInputDependency implements Writable, CoordInputDependency {
047    protected boolean isDependencyMet = false;
048    /*
049     * Transient variables only used for processing, not stored in DB.
050     */
051    protected transient Map<String, List<String>> missingDependenciesSet = new HashMap<String, List<String>>();
052    protected transient Map<String, List<String>> availableDependenciesSet = new HashMap<String, List<String>>();
053    protected Map<String, List<CoordInputInstance>> dependencyMap = new HashMap<String, List<CoordInputInstance>>();
054
055    public AbstractCoordInputDependency() {
056    }
057
058
059    public AbstractCoordInputDependency(Map<String, List<CoordInputInstance>> dependencyMap) {
060        this.dependencyMap = dependencyMap;
061        generateDependencies();
062    }
063
064    public void addInputInstanceList(String inputEventName, List<CoordInputInstance> inputInstanceList) {
065        dependencyMap.put(inputEventName, inputInstanceList);
066    }
067
068    public Map<String, List<CoordInputInstance>> getDependencyMap() {
069        return dependencyMap;
070    }
071
072    public void setDependencyMap(Map<String, List<CoordInputInstance>> dependencyMap) {
073        this.dependencyMap = dependencyMap;
074    }
075
076    public void addToAvailableDependencies(String dataSet, CoordInputInstance coordInputInstance) {
077        coordInputInstance.setAvailability(true);
078        List<String> availableSet = availableDependenciesSet.get(dataSet);
079        if (availableSet == null) {
080            availableSet = new ArrayList<String>();
081            availableDependenciesSet.put(dataSet, availableSet);
082        }
083        availableSet.add(coordInputInstance.getInputDataInstance());
084        removeFromMissingDependencies(dataSet, coordInputInstance);
085    }
086
087    public void removeFromMissingDependencies(String dataSet, CoordInputInstance coordInputInstance) {
088        coordInputInstance.setAvailability(true);
089        List<String> missingSet = missingDependenciesSet.get(dataSet);
090        if (missingSet != null) {
091            missingSet.remove(coordInputInstance.getInputDataInstance());
092            if (missingSet.isEmpty()) {
093                missingDependenciesSet.remove(dataSet);
094            }
095        }
096
097    }
098
099    public void addToMissingDependencies(String dataSet, CoordInputInstance coordInputInstance) {
100        List<String> availableSet = missingDependenciesSet.get(dataSet);
101        if (availableSet == null) {
102            availableSet = new ArrayList<String>();
103        }
104        availableSet.add(coordInputInstance.getInputDataInstance());
105        missingDependenciesSet.put(dataSet, availableSet);
106
107    }
108
109    protected void generateDependencies() {
110        try {
111            missingDependenciesSet = new HashMap<String, List<String>>();
112            availableDependenciesSet = new HashMap<String, List<String>>();
113
114            Set<String> keySets = dependencyMap.keySet();
115            for (String key : keySets) {
116                for (CoordInputInstance coordInputInstance : dependencyMap.get(key))
117                    if (coordInputInstance.isAvailable()) {
118                        addToAvailableDependencies(key, coordInputInstance);
119                    }
120                    else {
121                        addToMissingDependencies(key, coordInputInstance);
122                    }
123            }
124        }
125        catch (Exception e) {
126            throw new RuntimeException(e);
127        }
128
129    }
130
131    public List<String> getAvailableDependencies(String dataSet) {
132        if (availableDependenciesSet.get(dataSet) != null) {
133            return availableDependenciesSet.get(dataSet);
134        }
135        else {
136            return new ArrayList<String>();
137        }
138
139    }
140
141    public String getMissingDependencies(String dataSet) {
142        StringBuilder sb = new StringBuilder();
143        for (String dependencies : missingDependenciesSet.get(dataSet)) {
144            sb.append(dependencies).append("#");
145        }
146        return sb.toString();
147    }
148
149    public void addToAvailableDependencies(String dataSet, String availableSet) {
150        List<CoordInputInstance> list = dependencyMap.get(dataSet);
151        if (list == null) {
152            list = new ArrayList<CoordInputInstance>();
153            dependencyMap.put(dataSet, list);
154        }
155
156        for (String available : availableSet.split(CoordELFunctions.INSTANCE_SEPARATOR)) {
157            CoordInputInstance coordInstance = new CoordInputInstance(available, true);
158            list.add(coordInstance);
159            addToAvailableDependencies(dataSet, coordInstance);
160        }
161
162    }
163
164    public String getMissingDependencies() {
165        StringBuilder sb = new StringBuilder();
166        if (missingDependenciesSet != null) {
167            for (List<String> dependenciesList : missingDependenciesSet.values()) {
168                for (String dependencies : dependenciesList) {
169                    sb.append(dependencies).append("#");
170                }
171            }
172        }
173        return sb.toString();
174    }
175
176    public List<String> getMissingDependenciesAsList() {
177        List<String> missingDependencies = new ArrayList<String>();
178        for (List<String> dependenciesList : missingDependenciesSet.values()) {
179            missingDependencies.addAll(dependenciesList);
180        }
181        return missingDependencies;
182    }
183
184    public List<String> getAvailableDependenciesAsList() {
185        List<String> availableDependencies = new ArrayList<String>();
186        for (List<String> dependenciesList : availableDependenciesSet.values()) {
187            availableDependencies.addAll(dependenciesList);
188
189        }
190        return availableDependencies;
191    }
192
193    public String serialize() throws IOException {
194        return CoordInputDependencyFactory.getMagicNumber()
195                + new String(WritableUtils.toByteArray(this), CoordInputDependencyFactory.CHAR_ENCODING);
196
197    }
198
199    public String getListAsString(List<String> dataSets) {
200        StringBuilder sb = new StringBuilder();
201        for (String dependencies : dataSets) {
202            sb.append(dependencies).append("#");
203        }
204
205        return sb.toString();
206    }
207
208    public void setDependencyMet(boolean isDependencyMeet) {
209        this.isDependencyMet = isDependencyMeet;
210    }
211
212    public boolean isDependencyMet() {
213        return missingDependenciesSet.isEmpty() || isDependencyMet;
214    }
215
216    public boolean isUnResolvedDependencyMet() {
217        return false;
218    }
219
220
221    @Override
222    public void addToAvailableDependencies(Collection<String> availableList) {
223        for (Entry<String, List<CoordInputInstance>> dependenciesList : dependencyMap.entrySet()) {
224            for (CoordInputInstance coordInputInstance : dependenciesList.getValue()) {
225                if (availableList.contains(coordInputInstance.getInputDataInstance()))
226                    addToAvailableDependencies(dependenciesList.getKey(), coordInputInstance);
227            }
228        }
229    }
230
231    @Override
232    public ActionDependency checkPushMissingDependencies(CoordinatorActionBean coordAction,
233            boolean registerForNotification) throws CommandException, IOException,
234            JDOMException {
235        boolean status = new CoordInputLogicEvaluatorUtil(coordAction).checkPushDependencies();
236        if (status) {
237            coordAction.getPushInputDependencies().setDependencyMet(true);
238        }
239        return new ActionDependency(coordAction.getPushInputDependencies().getMissingDependenciesAsList(), coordAction
240                .getPushInputDependencies().getAvailableDependenciesAsList());
241
242    }
243
244    public boolean checkPullMissingDependencies(CoordinatorActionBean coordAction,
245            StringBuilder existList, StringBuilder nonExistList) throws IOException, JDOMException {
246        boolean status = new CoordInputLogicEvaluatorUtil(coordAction).checkPullMissingDependencies();
247        if (status) {
248            coordAction.getPullInputDependencies().setDependencyMet(true);
249        }
250        return status;
251
252    }
253
254    public boolean isChangeInDependency(StringBuilder nonExistList, String missingDependencies,
255            StringBuilder nonResolvedList, boolean status) {
256        if (!StringUtils.isEmpty(missingDependencies)) {
257            return !missingDependencies.equals(getMissingDependencies());
258        }
259        else {
260            return true;
261        }
262    }
263
264    @SuppressWarnings("unchecked")
265    public boolean checkUnresolved(CoordinatorActionBean coordAction, Element eAction)
266            throws Exception {
267        String actualTimeStr = eAction.getAttributeValue("action-actual-time");
268        Element inputList = eAction.getChild("input-events", eAction.getNamespace());
269        Date actualTime = null;
270        if (actualTimeStr == null) {
271            actualTime = new Date();
272        }
273        else {
274            actualTime = DateUtils.parseDateOozieTZ(actualTimeStr);
275        }
276        if (inputList == null) {
277            return true;
278        }
279        List<Element> eDataEvents = inputList.getChildren("data-in", eAction.getNamespace());
280        for (Element dEvent : eDataEvents) {
281            if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()) == null) {
282                continue;
283            }
284            String unResolvedInstance = dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG,
285                    dEvent.getNamespace()).getTextTrim();
286            String name = dEvent.getAttribute("name").getValue();
287            addUnResolvedList(name, unResolvedInstance);
288        }
289        return new CoordInputLogicEvaluatorUtil(coordAction).checkUnResolved(actualTime);
290    }
291
292    @Override
293    public void write(DataOutput out) throws IOException {
294        WritableUtils.writeStringAsBytes(out,INTERNAL_VERSION_ID);
295        out.writeBoolean(isDependencyMet);
296        WritableUtils.writeMapWithList(out, dependencyMap);
297
298    }
299
300    @Override
301    public void readFields(DataInput in) throws IOException {
302        WritableUtils.readBytesAsString(in);
303        this.isDependencyMet = in.readBoolean();
304        dependencyMap = WritableUtils.readMapWithList(in, CoordInputInstance.class);
305        generateDependencies();
306    }
307
308    public boolean isDataSetResolved(String dataSet){
309        if(getAvailableDependencies(dataSet) ==null|| getDependencyMap().get(dataSet) == null){
310            return false;
311        }
312        return getAvailableDependencies(dataSet).size() == getDependencyMap().get(dataSet).size();
313    }
314
315}