001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.coord.input.logic;
020
021import java.io.IOException;
022import java.io.StringReader;
023import java.net.URISyntaxException;
024import java.text.MessageFormat;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Date;
028import java.util.List;
029import java.util.Map;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.oozie.CoordinatorActionBean;
033import org.apache.oozie.ErrorCode;
034import org.apache.oozie.command.coord.CoordCommandUtils;
035import org.apache.oozie.coord.input.dependency.AbstractCoordInputDependency;
036import org.apache.oozie.coord.input.dependency.CoordInputDependency;
037import org.apache.oozie.coord.input.dependency.CoordInputInstance;
038import org.apache.oozie.coord.input.dependency.CoordPullInputDependency;
039import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorResult.STATUS;
040import org.apache.oozie.dependency.URIHandlerException;
041import org.apache.oozie.util.LogUtils;
042import org.apache.oozie.util.XConfiguration;
043import org.apache.oozie.util.XLog;
044
045/**
046 * PhaseOne is for all dependencies check, except unresolved. Unresolved will be checked as part of phaseTwo.
047 * Phasethree is only to get dependencies from dataset, no hdfs/hcat check.
048 */
049public class CoordInputLogicEvaluatorPhaseOne implements CoordInputLogicEvaluator {
050
051    protected AbstractCoordInputDependency coordInputDependency;
052    protected Map<String, List<CoordInputInstance>> dependencyMap;
053    protected CoordinatorActionBean coordAction = null;
054    protected XLog log = XLog.getLog(getClass());
055
056    public CoordInputLogicEvaluatorPhaseOne(CoordinatorActionBean coordAction) {
057        this(coordAction, coordAction.getPullInputDependencies());
058    }
059
060    public CoordInputLogicEvaluatorPhaseOne(CoordinatorActionBean coordAction, CoordInputDependency coordInputDependency) {
061        this.coordAction = coordAction;
062        this.coordInputDependency = (AbstractCoordInputDependency) coordInputDependency;
063        dependencyMap = ((AbstractCoordInputDependency) coordInputDependency).getDependencyMap();
064        LogUtils.setLogInfo(coordAction.getId());
065
066    }
067
068    public CoordInputLogicEvaluatorResult evalInput(String dataSet, int min, int wait) {
069        return input(coordInputDependency, dataSet, min, wait);
070
071    }
072
073    /**
074     * Evaluate input function with min and wait
075     *
076     * @param coordInputDependency
077     * @param dataSet
078     * @param min
079     * @param wait
080     * @return the coord input logic evaluator result
081     */
082    public CoordInputLogicEvaluatorResult input(AbstractCoordInputDependency coordInputDependency, String dataSet,
083            int min, int wait) {
084
085        List<String> availableList = new ArrayList<String>();
086        if (coordInputDependency.getDependencyMap().get(dataSet) == null) {
087            CoordInputLogicEvaluatorResult retData = new CoordInputLogicEvaluatorResult();
088            if (((CoordPullInputDependency) coordAction.getPullInputDependencies()).getUnResolvedDependency(dataSet) != null) {
089                log.debug("Data set [{0}] is unresolved set, will get resolved in phasetwo", dataSet);
090                retData.setStatus(CoordInputLogicEvaluatorResult.STATUS.PHASE_TWO_EVALUATION);
091            }
092            else {
093                return getResultFromPullPush(coordAction, dataSet, min);
094            }
095            return retData;
096        }
097        boolean allFound = true;
098        try {
099            Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
100            List<CoordInputInstance> firstInputSetList = coordInputDependency.getDependencyMap().get(dataSet);
101            for (int i = 0; i < firstInputSetList.size(); i++) {
102                CoordInputInstance coordInputInstance = firstInputSetList.get(i);
103                if (!coordInputInstance.isAvailable()) {
104                    if (pathExists(coordInputInstance.getInputDataInstance(), actionConf)) {
105                        availableList.add(coordInputInstance.getInputDataInstance());
106                        coordInputDependency.addToAvailableDependencies(dataSet, coordInputInstance);
107                    }
108                    else {
109                        log.debug("[{0} is not found ", coordInputInstance.getInputDataInstance());
110                        allFound = false;
111                        // Stop looking for dependencies, if min is not specified.
112                        if (min < 0) {
113                            break;
114                        }
115                    }
116                }
117                else {
118                    availableList.add(coordInputInstance.getInputDataInstance());
119                }
120            }
121        }
122        catch (Exception e) {
123            log.error(e);
124            throw new RuntimeException(ErrorCode.E1028.format("Error executing input function " + e.getMessage()));
125        }
126        CoordInputLogicEvaluatorResult retData = getEvalResult(allFound, min, wait, availableList);
127
128        log.debug("Resolved status of Data set [{0}] with min [{1}] and wait [{2}]  =  [{3}]", dataSet, min, wait,
129                retData.getStatus());
130        return retData;
131    }
132
133    public boolean isInputWaitElapsed(int timeInMin) {
134
135        if (timeInMin == -1) {
136            return true;
137        }
138        long waitingTime = (new Date().getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
139                .getCreatedTime().getTime()))
140                / (60 * 1000);
141        return timeInMin <= waitingTime;
142    }
143
144    public CoordInputLogicEvaluatorResult evalCombineInput(String[] inputSets, int min, int wait) {
145        return combine(coordInputDependency, inputSets, min, wait);
146    }
147
148    public CoordInputLogicEvaluatorResult combine(AbstractCoordInputDependency coordInputDependency,
149            String[] inputSets, int min, int wait) {
150
151        List<String> availableList = new ArrayList<String>();
152
153        if (coordInputDependency.getDependencyMap().get(inputSets[0]) == null) {
154            return new CoordInputLogicEvaluatorResult(CoordInputLogicEvaluatorResult.STATUS.TIMED_WAITING);
155        }
156
157        try {
158
159            Configuration jobConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
160            String firstInputSet = inputSets[0];
161            List<CoordInputInstance> firstInputSetList = coordInputDependency.getDependencyMap().get(firstInputSet);
162            for (int i = 0; i < firstInputSetList.size(); i++) {
163                CoordInputInstance coordInputInstance = firstInputSetList.get(i);
164                boolean found = false;
165                if (!coordInputInstance.isAvailable()) {
166                    if (!pathExists(coordInputInstance.getInputDataInstance(), jobConf)) {
167                        log.debug(MessageFormat.format("{0} is not found. Looking from other datasets.",
168                                coordInputInstance.getInputDataInstance()));
169                        for (int j = 1; j < inputSets.length; j++) {
170                            if (!coordInputDependency.getDependencyMap().get(inputSets[j]).get(i).isAvailable()) {
171                                if (pathExists(coordInputDependency.getDependencyMap().get(inputSets[j]).get(i)
172                                        .getInputDataInstance(), jobConf)) {
173                                    coordInputDependency.addToAvailableDependencies(inputSets[j], coordInputDependency
174                                            .getDependencyMap().get(inputSets[j]).get(i));
175                                    availableList.add(coordInputDependency.getDependencyMap().get(inputSets[j]).get(i)
176                                            .getInputDataInstance());
177                                    log.debug(MessageFormat.format("{0} is found.",
178                                            coordInputInstance.getInputDataInstance()));
179                                    found = true;
180                                }
181
182                            }
183                            else {
184                                coordInputDependency.addToAvailableDependencies(inputSets[j], coordInputDependency
185                                        .getDependencyMap().get(inputSets[j]).get(i));
186                                availableList.add(coordInputDependency.getDependencyMap().get(inputSets[j]).get(i)
187                                        .getInputDataInstance());
188                                found = true;
189
190                            }
191                        }
192                    }
193                    else {
194                        coordInputDependency.addToAvailableDependencies(firstInputSet, coordInputInstance);
195                        availableList.add(coordInputInstance.getInputDataInstance());
196                        found = true;
197                    }
198                }
199                else {
200                    availableList.add(coordInputInstance.getInputDataInstance());
201                    found = true;
202                }
203
204                if (min < 0 && !found) {
205                    // Stop looking for dependencies, if min is not specified.
206                    break;
207                }
208
209            }
210        }
211        catch (Exception e) {
212            log.error(e);
213            throw new RuntimeException(ErrorCode.E1028.format("Error executing combine function " + e.getMessage()));
214        }
215        boolean allFound = availableList.size() == coordInputDependency.getDependencyMap().get(inputSets[0]).size();
216        CoordInputLogicEvaluatorResult retData = getEvalResult(allFound, min, wait, availableList);
217        log.debug("Resolved status of Data set [{0}] with min [{1}] and wait [{2}]  =  [{3}]",
218                Arrays.toString(inputSets), min, wait, retData.getStatus());
219        return retData;
220
221    }
222
223    public Configuration getConf() throws IOException {
224        return new XConfiguration(new StringReader(coordAction.getRunConf()));
225
226    }
227
228    public String getListAsString(List<String> list, String dataset) {
229        if (list == null || list.isEmpty()) {
230            return "";
231        }
232        StringBuilder sb = new StringBuilder();
233        for (int i = 1; i < list.size(); i++) {
234            sb.append(list.get(i - 1)).append(",");
235        }
236        sb.append(list.get(list.size() - 1));
237        return sb.toString();
238    }
239
240    protected CoordInputLogicEvaluatorResult getEvalResult(boolean found, int min, int wait, List<String> availableList) {
241        CoordInputLogicEvaluatorResult retData = new CoordInputLogicEvaluatorResult();
242        if (!found && wait > 0) {
243            if (!isInputWaitElapsed(wait)) {
244                return new CoordInputLogicEvaluatorResult(STATUS.TIMED_WAITING);
245            }
246        }
247
248        if (found || (min > 0 && availableList.size() >= min)) {
249            retData.setStatus(CoordInputLogicEvaluatorResult.STATUS.TRUE);
250            retData.setDataSets(getListAsString(availableList, null));
251        }
252
253        if (min == 0) {
254            retData.setStatus(CoordInputLogicEvaluatorResult.STATUS.TRUE);
255        }
256
257        return retData;
258    }
259
260    protected boolean pathExists(String sPath, Configuration jobConf) throws IOException, URISyntaxException,
261            URIHandlerException {
262        return CoordCommandUtils.pathExists(sPath, jobConf);
263
264    }
265
266    public CoordInputLogicEvaluatorResult getResultFromPullPush(CoordinatorActionBean coordAction, String dataSet, int min) {
267        CoordInputLogicEvaluatorResult result = new CoordInputLogicEvaluatorResult();
268        CoordInputLogicEvaluatorResult pullResult = getEvalResult(
269                (AbstractCoordInputDependency) coordAction.getPullInputDependencies(), dataSet, min);
270        CoordInputLogicEvaluatorResult pushResult = getEvalResult(
271                (AbstractCoordInputDependency) coordAction.getPushInputDependencies(), dataSet, min);
272        result.appendDataSets(pullResult.getDataSets());
273        result.appendDataSets(pushResult.getDataSets());
274
275        if (pullResult.isWaiting() || pushResult.isWaiting()) {
276            result.setStatus(STATUS.TIMED_WAITING);
277        }
278
279        else if (pullResult.isPhaseTwoEvaluation() || pushResult.isPhaseTwoEvaluation()) {
280            result.setStatus(STATUS.PHASE_TWO_EVALUATION);
281        }
282
283        else if (pullResult.isTrue() || pushResult.isTrue()) {
284            result.setStatus(STATUS.TRUE);
285        }
286        else {
287            result.setStatus(STATUS.FALSE);
288        }
289        return result;
290
291    }
292
293    /**
294     * Gets evaluator Result
295     *
296     * @param coordInputDependencies the coord dependencies
297     * @param dataSet the data set
298     * @param min the min
299     * @return the coord input logic evaluator result
300     */
301    public CoordInputLogicEvaluatorResult getEvalResult(AbstractCoordInputDependency coordInputDependencies,
302            String dataSet, int min) {
303        CoordInputLogicEvaluatorResult result = new CoordInputLogicEvaluatorResult();
304        if ((coordInputDependencies.getAvailableDependencies(dataSet) == null || coordInputDependencies
305                .getAvailableDependencies(dataSet).isEmpty())) {
306            if (min == 0) {
307                result.setStatus(CoordInputLogicEvaluatorResult.STATUS.TRUE);
308            }
309            else {
310                result.setStatus(CoordInputLogicEvaluatorResult.STATUS.FALSE);
311            }
312        }
313
314        if (min > -1 && coordInputDependencies.getAvailableDependencies(dataSet).size() >= min) {
315            result.setStatus(CoordInputLogicEvaluatorResult.STATUS.TRUE);
316            result.appendDataSets(getListAsString(coordInputDependencies.getAvailableDependencies(dataSet), dataSet));
317        }
318
319        else if (coordInputDependencies.isDataSetResolved(dataSet)) {
320            result.setStatus(CoordInputLogicEvaluatorResult.STATUS.TRUE);
321            result.appendDataSets(getListAsString(coordInputDependencies.getAvailableDependencies(dataSet), dataSet));
322        }
323        return result;
324    }
325}