001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.coord.input.dependency;
020
021import java.io.IOException;
022import java.io.StringReader;
023import java.net.URISyntaxException;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collection;
027import java.util.Date;
028import java.util.HashMap;
029import java.util.HashSet;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034
035import org.apache.commons.lang.StringUtils;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.security.AccessControlException;
038import org.apache.oozie.CoordinatorActionBean;
039import org.apache.oozie.ErrorCode;
040import org.apache.oozie.client.OozieClient;
041import org.apache.oozie.command.CommandException;
042import org.apache.oozie.command.coord.CoordCommandUtils;
043import org.apache.oozie.coord.CoordELConstants;
044import org.apache.oozie.coord.CoordELEvaluator;
045import org.apache.oozie.coord.CoordELFunctions;
046import org.apache.oozie.coord.CoordUtils;
047import org.apache.oozie.dependency.ActionDependency;
048import org.apache.oozie.dependency.DependencyChecker;
049import org.apache.oozie.dependency.URIHandler;
050import org.apache.oozie.dependency.URIHandlerException;
051import org.apache.oozie.service.Services;
052import org.apache.oozie.service.URIHandlerService;
053import org.apache.oozie.util.DateUtils;
054import org.apache.oozie.util.ELEvaluator;
055import org.apache.oozie.util.ParamChecker;
056import org.apache.oozie.util.XConfiguration;
057import org.apache.oozie.util.XLog;
058import org.apache.oozie.util.XmlUtils;
059import org.jdom.Element;
060import org.jdom.JDOMException;
061
062/**
063 * Old approach where dependencies are stored as String.
064 */
065public class CoordOldInputDependency implements CoordInputDependency {
066
067    private XLog log = XLog.getLog(getClass());
068
069    protected transient String missingDependencies = "";
070
071    public CoordOldInputDependency(String missingDependencies) {
072        this.missingDependencies = missingDependencies;
073    }
074
075    public CoordOldInputDependency() {
076    }
077
078    @Override
079    public void addInputInstanceList(String inputEventName, List<CoordInputInstance> inputInstanceList) {
080        appendToDependencies(inputInstanceList);
081    }
082
083    @Override
084    public String getMissingDependencies() {
085        return missingDependencies;
086    }
087
088    @Override
089    public boolean isDependencyMet() {
090        return StringUtils.isEmpty(missingDependencies);
091    }
092
093    @Override
094    public boolean isUnResolvedDependencyMet() {
095        return false;
096    }
097
098    @Override
099    public void setDependencyMet(boolean isDependencyMeet) {
100        if (isDependencyMeet) {
101            missingDependencies = "";
102        }
103
104    }
105
106    @Override
107    public String serialize() throws IOException {
108        return missingDependencies;
109    }
110
111    @Override
112    public List<String> getMissingDependenciesAsList() {
113        return Arrays.asList(DependencyChecker.dependenciesAsArray(missingDependencies));
114    }
115
116    @Override
117    public List<String> getAvailableDependenciesAsList() {
118        return new ArrayList<String>();
119    }
120
121    @Override
122    public void setMissingDependencies(String missingDependencies) {
123        this.missingDependencies = missingDependencies;
124
125    }
126
127    public void appendToDependencies(List<CoordInputInstance> inputInstanceList) {
128        StringBuilder sb = new StringBuilder(missingDependencies);
129        boolean isFirst = true;
130        for (CoordInputInstance coordInputInstance : inputInstanceList) {
131            if (isFirst) {
132                if (!StringUtils.isEmpty(sb.toString())) {
133                    sb.append(CoordELFunctions.INSTANCE_SEPARATOR);
134                }
135            }
136            else {
137                sb.append(CoordELFunctions.INSTANCE_SEPARATOR);
138
139            }
140            sb.append(coordInputInstance.getInputDataInstance());
141            isFirst = false;
142        }
143        missingDependencies = sb.toString();
144    }
145
146    @Override
147    public void addUnResolvedList(String name, String unresolvedDependencies) {
148        StringBuilder sb = new StringBuilder(missingDependencies);
149        sb.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(unresolvedDependencies);
150        missingDependencies = sb.toString();
151    }
152
153    @Override
154    public List<String> getAvailableDependencies(String dataSet) {
155        return null;
156    }
157
158    @Override
159    public void addToAvailableDependencies(Collection<String> availableList) {
160
161        if (StringUtils.isEmpty(missingDependencies)) {
162            return;
163        }
164        List<String> missingDependenciesList = new ArrayList<String>(
165                Arrays.asList((DependencyChecker.dependenciesAsArray(missingDependencies))));
166        missingDependenciesList.removeAll(availableList);
167        missingDependencies = DependencyChecker.dependenciesAsString(missingDependenciesList);
168
169    }
170
171    @Override
172    public boolean checkPullMissingDependencies(CoordinatorActionBean coordAction, StringBuilder existList,
173            StringBuilder nonExistList) throws IOException, JDOMException {
174        Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
175        Element eAction = XmlUtils.parseXml(coordAction.getActionXml());
176
177        Element inputList = eAction.getChild("input-events", eAction.getNamespace());
178        if (inputList != null) {
179            if (nonExistList.length() > 0) {
180                checkListOfPaths(coordAction, existList, nonExistList, actionConf);
181            }
182            return nonExistList.length() == 0;
183        }
184        return true;
185    }
186
187    public ActionDependency checkPushMissingDependencies(CoordinatorActionBean coordAction,
188            boolean registerForNotification) throws CommandException, IOException {
189        return DependencyChecker.checkForAvailability(getMissingDependenciesAsList(),
190                new XConfiguration(new StringReader(coordAction.getRunConf())), !registerForNotification);
191    }
192
193    private boolean checkListOfPaths(CoordinatorActionBean coordAction, StringBuilder existList,
194            StringBuilder nonExistList, Configuration conf) throws IOException {
195
196        String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR);
197        if (uriList[0] != null) {
198            log.info("[" + coordAction.getId() + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0]
199                    + " is Missing.");
200        }
201
202        nonExistList.delete(0, nonExistList.length());
203        boolean allExists = true;
204        String existSeparator = "", nonExistSeparator = "";
205        String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
206        for (int i = 0; i < uriList.length; i++) {
207            if (allExists) {
208                allExists = pathExists(coordAction, uriList[i], conf, user);
209                log.info("[" + coordAction.getId() + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :"
210                        + allExists);
211            }
212            if (allExists) {
213                existList.append(existSeparator).append(uriList[i]);
214                existSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
215            }
216            else {
217                nonExistList.append(nonExistSeparator).append(uriList[i]);
218                nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
219            }
220        }
221        return allExists;
222    }
223
224    public boolean pathExists(CoordinatorActionBean coordAction, String sPath, Configuration actionConf, String user)
225            throws IOException {
226        log.debug("checking for the file " + sPath);
227        try {
228            return CoordCommandUtils.pathExists(sPath, actionConf, user);
229        }
230        catch (URIHandlerException e) {
231            if (coordAction != null) {
232                coordAction.setErrorCode(e.getErrorCode().toString());
233                coordAction.setErrorMessage(e.getMessage());
234            }
235            if (e.getCause() != null && e.getCause() instanceof AccessControlException) {
236                throw (AccessControlException) e.getCause();
237            }
238            else {
239                log.error(e);
240                throw new IOException(e);
241            }
242        }
243        catch (URISyntaxException e) {
244            if (coordAction != null) {
245                coordAction.setErrorCode(ErrorCode.E0906.toString());
246                coordAction.setErrorMessage(e.getMessage());
247            }
248            log.error(e);
249            throw new IOException(e);
250        }
251    }
252
253    public boolean isChangeInDependency(StringBuilder nonExistList, String missingDependencies,
254            StringBuilder nonResolvedList, boolean status) {
255        if ((!nonExistList.toString().equals(missingDependencies) || missingDependencies.isEmpty())) {
256            setMissingDependencies(nonExistList.toString());
257            return true;
258        }
259        return false;
260    }
261
262    @SuppressWarnings("unchecked")
263    public boolean checkUnresolved(CoordinatorActionBean coordAction, Element eAction) throws Exception {
264        Date nominalTime = DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time"));
265        String actualTimeStr = eAction.getAttributeValue("action-actual-time");
266        Element inputList = eAction.getChild("input-events", eAction.getNamespace());
267
268        if (inputList == null) {
269            return true;
270        }
271
272        List<Element> eDataEvents = inputList.getChildren("data-in", eAction.getNamespace());
273        Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
274
275        if (eDataEvents != null) {
276            Date actualTime = null;
277            if (actualTimeStr == null) {
278                actualTime = new Date();
279            }
280            else {
281                actualTime = DateUtils.parseDateOozieTZ(actualTimeStr);
282            }
283
284            for (Element dEvent : eDataEvents) {
285                if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()) == null) {
286                    continue;
287                }
288                ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, actionConf);
289                String unResolvedInstance = dEvent
290                        .getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()).getTextTrim();
291                String unresolvedList[] = unResolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR);
292                StringBuffer resolvedTmp = new StringBuffer();
293                for (int i = 0; i < unresolvedList.length; i++) {
294                    String returnData = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]);
295                    Boolean isResolved = (Boolean) eval.getVariable(CoordELConstants.IS_RESOLVED);
296                    if (isResolved == false) {
297                        log.info("[" + coordAction.getId() + "] :: Cannot resolve : " + returnData);
298                        return false;
299                    }
300                    if (resolvedTmp.length() > 0) {
301                        resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR);
302                    }
303                    resolvedTmp.append((String) eval.getVariable(CoordELConstants.RESOLVED_PATH));
304                }
305                if (resolvedTmp.length() > 0) {
306                    if (dEvent.getChild("uris", dEvent.getNamespace()) != null) {
307                        resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR)
308                                .append(dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim());
309                        dEvent.removeChild("uris", dEvent.getNamespace());
310                    }
311                    Element uriInstance = new Element("uris", dEvent.getNamespace());
312                    uriInstance.addContent(resolvedTmp.toString());
313                    dEvent.getContent().add(1, uriInstance);
314                }
315                dEvent.removeChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace());
316            }
317        }
318
319        return true;
320    }
321
322    public Map<String, ActionDependency> getMissingDependencies(CoordinatorActionBean coordAction)
323            throws CommandException, IOException, JDOMException {
324
325        Map<String, ActionDependency> dependenciesMap = null;
326        try {
327            dependenciesMap = getDependency(coordAction);
328        }
329        catch (URIHandlerException e) {
330            throw new IOException(e);
331        }
332
333        StringBuilder nonExistList = new StringBuilder();
334        StringBuilder nonResolvedList = new StringBuilder();
335        CoordCommandUtils.getResolvedList(getMissingDependencies(), nonExistList, nonResolvedList);
336
337        Set<String> missingSets = new HashSet<String>(
338                Arrays.asList(nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)));
339
340        missingSets.addAll(
341                Arrays.asList(nonResolvedList.toString().split(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR)));
342
343        for (Iterator<Map.Entry<String, ActionDependency>> it = dependenciesMap.entrySet().iterator(); it.hasNext();) {
344            Map.Entry<String, ActionDependency> entry = it.next();
345            ActionDependency dependency = entry.getValue();
346            dependency.getMissingDependencies().retainAll(missingSets);
347
348            if (dependency.getUriTemplate() != null) {
349                for (int i = 0; i < dependency.getMissingDependencies().size(); i++) {
350                    if (dependency.getMissingDependencies().get(i).trim().startsWith("${coord:")) {
351                        dependency.getMissingDependencies().set(i,
352                                dependency.getMissingDependencies().get(i) + " -> " + dependency.getUriTemplate());
353                    }
354                }
355            }
356
357            if (dependenciesMap.get(entry.getKey()).getMissingDependencies().isEmpty()) {
358                it.remove();
359            }
360
361        }
362        return dependenciesMap;
363    }
364
365    @SuppressWarnings("unchecked")
366    private Map<String, ActionDependency> getDependency(CoordinatorActionBean coordAction)
367            throws JDOMException, URIHandlerException {
368        Map<String, ActionDependency> dependenciesMap = new HashMap<String, ActionDependency>();
369        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
370
371        Element eAction = XmlUtils.parseXml(coordAction.getActionXml());
372        Element inputList = eAction.getChild("input-events", eAction.getNamespace());
373        List<Element> eDataEvents = inputList.getChildren("data-in", eAction.getNamespace());
374        for (Element event : eDataEvents) {
375            Element uri = event.getChild("uris", event.getNamespace());
376            ActionDependency dependency = new ActionDependency();
377            if (uri != null) {
378                Element doneFlagElement = event.getChild("dataset", event.getNamespace()).getChild("done-flag",
379                        event.getNamespace());
380                String[] dataSets = uri.getText().split(CoordELFunctions.INSTANCE_SEPARATOR);
381                String doneFlag = CoordUtils.getDoneFlag(doneFlagElement);
382
383                for (String dataSet : dataSets) {
384                    URIHandler uriHandler;
385                    uriHandler = uriService.getURIHandler(dataSet);
386                    dependency.getMissingDependencies().add(uriHandler.getURIWithDoneFlag(dataSet, doneFlag));
387                }
388            }
389            if (event.getChildTextTrim(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, event.getNamespace()) != null) {
390                ActionDependency unResolvedDependency = getUnResolvedDependency(coordAction, event);
391                dependency.getMissingDependencies()
392                        .addAll(unResolvedDependency.getMissingDependencies());
393                dependency.setUriTemplate(unResolvedDependency.getUriTemplate());
394            }
395            dependenciesMap.put(event.getAttributeValue("name"), dependency);
396        }
397        return dependenciesMap;
398    }
399
400    private ActionDependency getUnResolvedDependency(CoordinatorActionBean coordAction, Element event)
401            throws JDOMException, URIHandlerException {
402        String tmpUnresolved = event.getChildTextTrim(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, event.getNamespace());
403        StringBuilder nonResolvedList = new StringBuilder();
404        CoordCommandUtils.getResolvedList(getMissingDependencies(), new StringBuilder(), nonResolvedList);
405        ActionDependency dependency = new ActionDependency();
406
407        if (nonResolvedList.length() > 0) {
408            Element eDataset = event.getChild("dataset", event.getNamespace());
409            dependency.setUriTemplate(eDataset.getChild("uri-template", eDataset.getNamespace()).getTextTrim());
410            dependency.getMissingDependencies().add(tmpUnresolved);
411        }
412        return dependency;
413    }
414
415    @Override
416    public String getFirstMissingDependency() {
417        StringBuilder nonExistList = new StringBuilder();
418        String missingDependencies = getMissingDependencies();
419        StringBuilder nonResolvedList = new StringBuilder();
420        CoordCommandUtils.getResolvedList(missingDependencies, nonExistList, nonResolvedList);
421        String firstMissingDependency = "";
422        if (nonExistList.length() > 0) {
423            firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0];
424        }
425        else {
426            if (nonResolvedList.length() > 0) {
427                firstMissingDependency = nonResolvedList.toString().split(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR)[0];
428            }
429        }
430        return firstMissingDependency;
431    }
432}