001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.coord.input.dependency;
020
021import java.io.IOException;
022import java.io.StringReader;
023import java.net.URISyntaxException;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collection;
027import java.util.Date;
028import java.util.List;
029
030import org.apache.commons.lang.StringUtils;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.security.AccessControlException;
033import org.apache.oozie.CoordinatorActionBean;
034import org.apache.oozie.ErrorCode;
035import org.apache.oozie.client.OozieClient;
036import org.apache.oozie.command.CommandException;
037import org.apache.oozie.command.coord.CoordCommandUtils;
038import org.apache.oozie.coord.CoordELConstants;
039import org.apache.oozie.coord.CoordELEvaluator;
040import org.apache.oozie.coord.CoordELFunctions;
041import org.apache.oozie.dependency.ActionDependency;
042import org.apache.oozie.dependency.DependencyChecker;
043import org.apache.oozie.dependency.URIHandlerException;
044import org.apache.oozie.util.DateUtils;
045import org.apache.oozie.util.ELEvaluator;
046import org.apache.oozie.util.ParamChecker;
047import org.apache.oozie.util.XConfiguration;
048import org.apache.oozie.util.XLog;
049import org.apache.oozie.util.XmlUtils;
050import org.jdom.Element;
051import org.jdom.JDOMException;
052
053/**
054 * Old approach where dependencies are stored as String.
055 *
056 */
057public class CoordOldInputDependency implements CoordInputDependency {
058
059    private XLog log = XLog.getLog(getClass());
060
061    protected transient String missingDependencies = "";
062
063    public CoordOldInputDependency(String missingDependencies) {
064        this.missingDependencies = missingDependencies;
065    }
066
067    public CoordOldInputDependency() {
068    }
069
070    @Override
071    public void addInputInstanceList(String inputEventName, List<CoordInputInstance> inputInstanceList) {
072        appendToDependencies(inputInstanceList);
073    }
074
075    @Override
076    public String getMissingDependencies() {
077        return missingDependencies;
078    }
079
080    @Override
081    public boolean isDependencyMet() {
082        return StringUtils.isEmpty(missingDependencies);
083    }
084
085    @Override
086    public boolean isUnResolvedDependencyMet() {
087        return false;
088    }
089
090    @Override
091    public void setDependencyMet(boolean isDependencyMeet) {
092        if (isDependencyMeet) {
093            missingDependencies = "";
094        }
095
096    }
097
098    @Override
099    public String serialize() throws IOException {
100        return missingDependencies;
101    }
102
103    @Override
104    public List<String> getMissingDependenciesAsList() {
105        return Arrays.asList(DependencyChecker.dependenciesAsArray(missingDependencies));
106    }
107
108    @Override
109    public List<String> getAvailableDependenciesAsList() {
110        return new ArrayList<String>();
111    }
112
113    @Override
114    public void setMissingDependencies(String missingDependencies) {
115        this.missingDependencies = missingDependencies;
116
117    }
118
119    public void appendToDependencies(List<CoordInputInstance> inputInstanceList) {
120        StringBuilder sb = new StringBuilder(missingDependencies);
121        boolean isFirst = true;
122        for (CoordInputInstance coordInputInstance : inputInstanceList) {
123            if (isFirst) {
124                if (!StringUtils.isEmpty(sb.toString())) {
125                    sb.append(CoordELFunctions.INSTANCE_SEPARATOR);
126                }
127            }
128            else {
129                sb.append(CoordELFunctions.INSTANCE_SEPARATOR);
130
131            }
132            sb.append(coordInputInstance.getInputDataInstance());
133            isFirst = false;
134        }
135        missingDependencies = sb.toString();
136    }
137
138    @Override
139    public void addUnResolvedList(String name, String unresolvedDependencies) {
140        StringBuilder sb = new StringBuilder(missingDependencies);
141        sb.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(unresolvedDependencies);
142        missingDependencies = sb.toString();
143    }
144
145    @Override
146    public List<String> getAvailableDependencies(String dataSet) {
147        return null;
148    }
149
150    @Override
151    public void addToAvailableDependencies(Collection<String> availableList) {
152
153        if (StringUtils.isEmpty(missingDependencies)) {
154            return;
155        }
156        List<String> missingDependenciesList = new ArrayList<String>(Arrays.asList((DependencyChecker
157                .dependenciesAsArray(missingDependencies))));
158        missingDependenciesList.removeAll(availableList);
159        missingDependencies = DependencyChecker.dependenciesAsString(missingDependenciesList);
160
161    }
162
163    @Override
164    public boolean checkPullMissingDependencies(CoordinatorActionBean coordAction, StringBuilder existList,
165            StringBuilder nonExistList) throws IOException, JDOMException {
166        Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
167        Element eAction = XmlUtils.parseXml(coordAction.getActionXml());
168
169        Element inputList = eAction.getChild("input-events", eAction.getNamespace());
170        if (inputList != null) {
171            if (nonExistList.length() > 0) {
172                checkListOfPaths(coordAction, existList, nonExistList, actionConf);
173            }
174            return nonExistList.length() == 0;
175        }
176        return true;
177    }
178
179    public ActionDependency checkPushMissingDependencies(CoordinatorActionBean coordAction,
180            boolean registerForNotification) throws CommandException, IOException {
181        return DependencyChecker.checkForAvailability(getMissingDependenciesAsList(), new XConfiguration(
182                new StringReader(coordAction.getRunConf())), !registerForNotification);
183    }
184
185    private boolean checkListOfPaths(CoordinatorActionBean coordAction, StringBuilder existList,
186            StringBuilder nonExistList, Configuration conf) throws IOException {
187
188        String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR);
189        if (uriList[0] != null) {
190            log.info("[" + coordAction.getId() + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0]
191                    + " is Missing.");
192        }
193
194        nonExistList.delete(0, nonExistList.length());
195        boolean allExists = true;
196        String existSeparator = "", nonExistSeparator = "";
197        String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
198        for (int i = 0; i < uriList.length; i++) {
199            if (allExists) {
200                allExists = pathExists(coordAction, uriList[i], conf, user);
201                log.info("[" + coordAction.getId() + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :"
202                        + allExists);
203            }
204            if (allExists) {
205                existList.append(existSeparator).append(uriList[i]);
206                existSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
207            }
208            else {
209                nonExistList.append(nonExistSeparator).append(uriList[i]);
210                nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
211            }
212        }
213        return allExists;
214    }
215
216    public boolean pathExists(CoordinatorActionBean coordAction, String sPath, Configuration actionConf, String user)
217            throws IOException {
218        log.debug("checking for the file " + sPath);
219        try {
220            return CoordCommandUtils.pathExists(sPath, actionConf, user);
221        }
222        catch (URIHandlerException e) {
223            if (coordAction != null) {
224                coordAction.setErrorCode(e.getErrorCode().toString());
225                coordAction.setErrorMessage(e.getMessage());
226            }
227            if (e.getCause() != null && e.getCause() instanceof AccessControlException) {
228                throw (AccessControlException) e.getCause();
229            }
230            else {
231                log.error(e);
232                throw new IOException(e);
233            }
234        }
235        catch (URISyntaxException e) {
236            if (coordAction != null) {
237                coordAction.setErrorCode(ErrorCode.E0906.toString());
238                coordAction.setErrorMessage(e.getMessage());
239            }
240            log.error(e);
241            throw new IOException(e);
242        }
243    }
244
245    public boolean isChangeInDependency(StringBuilder nonExistList, String missingDependencies,
246            StringBuilder nonResolvedList, boolean status) {
247        if ((!nonExistList.toString().equals(missingDependencies) || missingDependencies.isEmpty())) {
248            setMissingDependencies(nonExistList.toString());
249            return true;
250        }
251        return false;
252    }
253
254    @SuppressWarnings("unchecked")
255    public boolean checkUnresolved(CoordinatorActionBean coordAction, Element eAction)
256            throws Exception {
257        Date nominalTime = DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time"));
258        String actualTimeStr = eAction.getAttributeValue("action-actual-time");
259        Element inputList = eAction.getChild("input-events", eAction.getNamespace());
260
261        if(inputList==null){
262            return true;
263        }
264
265        List<Element> eDataEvents = inputList.getChildren("data-in", eAction.getNamespace());
266        Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
267
268        if (eDataEvents != null) {
269            Date actualTime = null;
270            if (actualTimeStr == null) {
271                actualTime = new Date();
272            }
273            else {
274                actualTime = DateUtils.parseDateOozieTZ(actualTimeStr);
275            }
276
277            for (Element dEvent : eDataEvents) {
278                if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()) == null) {
279                    continue;
280                }
281                ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, actionConf);
282                String unResolvedInstance = dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG,
283                        dEvent.getNamespace()).getTextTrim();
284                String unresolvedList[] = unResolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR);
285                StringBuffer resolvedTmp = new StringBuffer();
286                for (int i = 0; i < unresolvedList.length; i++) {
287                    String returnData = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]);
288                    Boolean isResolved = (Boolean) eval.getVariable(CoordELConstants.IS_RESOLVED);
289                    if (isResolved == false) {
290                        log.info("[" + coordAction.getId() + "] :: Cannot resolve : " + returnData);
291                        return false;
292                    }
293                    if (resolvedTmp.length() > 0) {
294                        resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR);
295                    }
296                    resolvedTmp.append((String) eval.getVariable(CoordELConstants.RESOLVED_PATH));
297                }
298                if (resolvedTmp.length() > 0) {
299                    if (dEvent.getChild("uris", dEvent.getNamespace()) != null) {
300                        resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append(
301                                dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim());
302                        dEvent.removeChild("uris", dEvent.getNamespace());
303                    }
304                    Element uriInstance = new Element("uris", dEvent.getNamespace());
305                    uriInstance.addContent(resolvedTmp.toString());
306                    dEvent.getContent().add(1, uriInstance);
307                }
308                dEvent.removeChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace());
309            }
310        }
311
312        return true;
313    }
314
315}