001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.util.Date;
023    import java.util.List;
024    
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.fs.Path;
027    import org.apache.oozie.CoordinatorActionBean;
028    import org.apache.oozie.CoordinatorJobBean;
029    import org.apache.oozie.ErrorCode;
030    import org.apache.oozie.client.CoordinatorAction;
031    import org.apache.oozie.client.Job;
032    import org.apache.oozie.client.OozieClient;
033    import org.apache.oozie.command.CommandException;
034    import org.apache.oozie.command.PreconditionException;
035    import org.apache.oozie.coord.CoordELEvaluator;
036    import org.apache.oozie.coord.CoordELFunctions;
037    import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
038    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
039    import org.apache.oozie.executor.jpa.JPAExecutorException;
040    import org.apache.oozie.service.HadoopAccessorException;
041    import org.apache.oozie.service.HadoopAccessorService;
042    import org.apache.oozie.service.JPAService;
043    import org.apache.oozie.service.Services;
044    import org.apache.oozie.util.DateUtils;
045    import org.apache.oozie.util.ELEvaluator;
046    import org.apache.oozie.util.Instrumentation;
047    import org.apache.oozie.util.LogUtils;
048    import org.apache.oozie.util.ParamChecker;
049    import org.apache.oozie.util.StatusUtils;
050    import org.apache.oozie.util.XConfiguration;
051    import org.apache.oozie.util.XmlUtils;
052    import org.jdom.Element;
053    
054    /**
055     * The command to check if an action's data input paths exist in the file system.
056     */
057    public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
058    
059        private final String actionId;
060        private final int COMMAND_REQUEUE_INTERVAL = 60000; // 1 minute
061        private CoordinatorActionBean coordAction = null;
062        private CoordinatorJobBean coordJob = null;
063        private JPAService jpaService = null;
064    
065        public CoordActionInputCheckXCommand(String actionId) {
066            super("coord_action_input", "coord_action_input", 1);
067            this.actionId = ParamChecker.notEmpty(actionId, "actionId");
068        }
069    
070        /* (non-Javadoc)
071         * @see org.apache.oozie.command.XCommand#execute()
072         */
073        @Override
074        protected Void execute() throws CommandException {
075            LOG.info("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state.");
076    
077            // this action should only get processed if current time > nominal time;
078            // otherwise, requeue this action for delay execution;
079            Date nominalTime = coordAction.getNominalTime();
080            Date currentTime = new Date();
081            if (nominalTime.compareTo(currentTime) > 0) {
082                queue(new CoordActionInputCheckXCommand(coordAction.getId()), Math.max(
083                        (nominalTime.getTime() - currentTime.getTime()), COMMAND_REQUEUE_INTERVAL));
084                // update lastModifiedTime
085                coordAction.setLastModifiedTime(new Date());
086                try {
087                    jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor(coordAction));
088                }
089                catch (JPAExecutorException e) {
090                    throw new CommandException(e);
091                }
092                LOG.info("[" + actionId
093                        + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current="
094                        + currentTime + ", nominal=" + nominalTime);
095    
096                return null;
097            }
098    
099            StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
100            Instrumentation.Cron cron = new Instrumentation.Cron();
101            try {
102                Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
103                cron.start();
104                StringBuilder existList = new StringBuilder();
105                StringBuilder nonExistList = new StringBuilder();
106                StringBuilder nonResolvedList = new StringBuilder();
107                CoordCommandUtils.getResolvedList(coordAction.getMissingDependencies(), nonExistList, nonResolvedList);
108    
109                LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + nonExistList.toString() + " "
110                        + nonResolvedList.toString());
111                boolean status = checkInput(actionXml, existList, nonExistList, actionConf);
112                coordAction.setLastModifiedTime(currentTime);
113                coordAction.setActionXml(actionXml.toString());
114                if (nonResolvedList.length() > 0 && status == false) {
115                    nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
116                }
117                coordAction.setMissingDependencies(nonExistList.toString());
118                if (status == true) {
119                    coordAction.setStatus(CoordinatorAction.Status.READY);
120                    // pass jobID to the CoordActionReadyXCommand
121                    queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
122                }
123                else {
124                    long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
125                            .getCreatedTime().getTime()))
126                            / (60 * 1000);
127                    int timeOut = coordAction.getTimeOut();
128                    if ((timeOut >= 0) && (waitingTime > timeOut)) {
129                        queue(new CoordActionTimeOutXCommand(coordAction), 100);
130                    }
131                    else {
132                        queue(new CoordActionInputCheckXCommand(coordAction.getId()), COMMAND_REQUEUE_INTERVAL);
133                    }
134                }
135                coordAction.setLastModifiedTime(new Date());
136                jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor(coordAction));
137            }
138            catch (Exception e) {
139                throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
140            }
141            cron.stop();
142    
143            return null;
144        }
145    
146        /**
147         * To check the list of input paths if all of them exist
148         *
149         * @param actionXml action xml
150         * @param existList the list of existed paths
151         * @param nonExistList the list of non existed paths
152         * @param conf action configuration
153         * @return true if all input paths are existed
154         * @throws Exception thrown of unable to check input path
155         */
156        protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
157                Configuration conf) throws Exception {
158            Element eAction = XmlUtils.parseXml(actionXml.toString());
159            boolean allExist = checkResolvedUris(eAction, existList, nonExistList, conf);
160            if (allExist) {
161                LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
162                allExist = checkUnresolvedInstances(eAction, conf);
163            }
164            if (allExist == true) {
165                materializeDataProperties(eAction, conf);
166                actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString());
167            }
168            return allExist;
169        }
170    
171        /**
172         * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list
173         * of files that will be needed.
174         *
175         * @param eAction action element
176         * @param conf action configuration
177         * @throws Exception thrown if failed to resolve data properties
178         * @update modify 'Action' element with appropriate list of files.
179         */
180        @SuppressWarnings("unchecked")
181        private void materializeDataProperties(Element eAction, Configuration conf) throws Exception {
182            ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, conf, actionId);
183            Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow",
184                    eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
185            if (configElem != null) {
186                for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
187                    resolveTagContents("value", propElem, eval);
188                }
189            }
190        }
191    
192        /**
193         * To resolve property value which contains el functions
194         *
195         * @param tagName tag name
196         * @param elem the child element of "property" element
197         * @param eval el functions evaluator
198         * @throws Exception thrown if unable to resolve tag value
199         */
200        private void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception {
201            if (elem == null) {
202                return;
203            }
204            Element tagElem = elem.getChild(tagName, elem.getNamespace());
205            if (tagElem != null) {
206                String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText());
207                tagElem.removeContent();
208                tagElem.addContent(updated);
209            }
210            else {
211                LOG.warn(" Value NOT FOUND " + tagName);
212            }
213        }
214    
215        /**
216         * Check if any unsolved paths under data output. Resolve the unresolved data input paths.
217         *
218         * @param eAction action element
219         * @param actionConf action configuration
220         * @return true if successful to resolve input and output paths
221         * @throws Exception thrown if failed to resolve data input and output paths
222         */
223        @SuppressWarnings("unchecked")
224        private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf) throws Exception {
225            String strAction = XmlUtils.prettyPrint(eAction).toString();
226            Date nominalTime = DateUtils.parseDateUTC(eAction.getAttributeValue("action-nominal-time"));
227            String actualTimeStr = eAction.getAttributeValue("action-actual-time");
228            Date actualTime = null;
229            if (actualTimeStr == null) {
230                LOG.debug("Unable to get action-actual-time from action xml, this job is submitted " +
231                "from previous version. Assign current date to actual time, action = " + actionId);
232                actualTime = new Date();
233            } else {
234                actualTime = DateUtils.parseDateUTC(actualTimeStr);
235            }
236    
237            StringBuffer resultedXml = new StringBuffer();
238    
239            boolean ret;
240            Element inputList = eAction.getChild("input-events", eAction.getNamespace());
241            if (inputList != null) {
242                ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime,
243                        actualTime, actionConf);
244                if (ret == false) {
245                    resultedXml.append(strAction);
246                    return false;
247                }
248            }
249    
250            // Using latest() or future() in output-event is not intuitive.
251            // We need to make sure, this assumption is correct.
252            Element outputList = eAction.getChild("output-events", eAction.getNamespace());
253            if (outputList != null) {
254                for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
255                    if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) != null) {
256                        throw new CommandException(ErrorCode.E1006, "coord:latest()/future()",
257                                " not permitted in output-event ");
258                    }
259                }
260            }
261            return true;
262        }
263    
264        /**
265         * Resolve the list of data input paths
266         *
267         * @param eDataEvents the list of data input elements
268         * @param nominalTime action nominal time
269         * @param actualTime current time
270         * @param conf action configuration
271         * @return true if all unresolved URIs can be resolved
272         * @throws Exception thrown if failed to resolve data input paths
273         */
274        @SuppressWarnings("unchecked")
275        private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime,
276                Configuration conf) throws Exception {
277            for (Element dEvent : eDataEvents) {
278                if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) == null) {
279                    continue;
280                }
281                ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf);
282                String uresolvedInstance = dEvent.getChild("unresolved-instances", dEvent.getNamespace()).getTextTrim();
283                String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR);
284                StringBuffer resolvedTmp = new StringBuffer();
285                for (int i = 0; i < unresolvedList.length; i++) {
286                    String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]);
287                    Boolean isResolved = (Boolean) eval.getVariable("is_resolved");
288                    if (isResolved == false) {
289                        LOG.info("[" + actionId + "]::Cannot resolve: " + ret);
290                        return false;
291                    }
292                    if (resolvedTmp.length() > 0) {
293                        resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR);
294                    }
295                    resolvedTmp.append((String) eval.getVariable("resolved_path"));
296                }
297                if (resolvedTmp.length() > 0) {
298                    if (dEvent.getChild("uris", dEvent.getNamespace()) != null) {
299                        resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append(
300                                dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim());
301                        dEvent.removeChild("uris", dEvent.getNamespace());
302                    }
303                    Element uriInstance = new Element("uris", dEvent.getNamespace());
304                    uriInstance.addContent(resolvedTmp.toString());
305                    dEvent.getContent().add(1, uriInstance);
306                }
307                dEvent.removeChild("unresolved-instances", dEvent.getNamespace());
308            }
309    
310            return true;
311        }
312    
313        /**
314         * Check all resolved URIs existence
315         *
316         * @param eAction action element
317         * @param existList the list of existed paths
318         * @param nonExistList the list of paths to check existence
319         * @param conf action configuration
320         * @return true if all nonExistList paths exist
321         * @throws IOException thrown if unable to access the path
322         */
323        private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList,
324                Configuration conf) throws IOException {
325            LOG.info("[" + actionId + "]::ActionInputCheck:: In checkResolvedUris...");
326            Element inputList = eAction.getChild("input-events", eAction.getNamespace());
327            if (inputList != null) {
328                if (nonExistList.length() > 0) {
329                    checkListOfPaths(existList, nonExistList, conf);
330                }
331                return nonExistList.length() == 0;
332            }
333            return true;
334        }
335    
336        /**
337         * Check a list of non existed paths and add to exist list if it exists
338         *
339         * @param existList the list of existed paths
340         * @param nonExistList the list of paths to check existence
341         * @param conf action configuration
342         * @return true if all nonExistList paths exist
343         * @throws IOException thrown if unable to access the path
344         */
345        private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf)
346                throws IOException {
347    
348            String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR);
349            if (uriList[0] != null) {
350                LOG.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing.");
351            }
352    
353            nonExistList.delete(0, nonExistList.length());
354            boolean allExists = true;
355            String existSeparator = "", nonExistSeparator = "";
356            for (int i = 0; i < uriList.length; i++) {
357                if (allExists) {
358                    allExists = pathExists(uriList[i], conf);
359                    LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists);
360                }
361                if (allExists) {
362                    existList.append(existSeparator).append(uriList[i]);
363                    existSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
364                }
365                else {
366                    nonExistList.append(nonExistSeparator).append(uriList[i]);
367                    nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
368                }
369            }
370            return allExists;
371        }
372    
373        /**
374         * Check if given path exists
375         *
376         * @param sPath uri path
377         * @param actionConf action configuration
378         * @return true if path exists
379         * @throws IOException thrown if unable to access the path
380         */
381        private boolean pathExists(String sPath, Configuration actionConf) throws IOException {
382            LOG.debug("checking for the file " + sPath);
383            Path path = new Path(sPath);
384            String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
385            String group = ParamChecker.notEmpty(actionConf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME);
386            try {
387                return Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, path.toUri(),
388                        new Configuration()).exists(path);
389            }
390            catch (HadoopAccessorException e) {
391                throw new IOException(e);
392            }
393        }
394    
395        /**
396         * The function create a list of URIs separated by "," using the instances time stamp and URI-template
397         *
398         * @param event : <data-in> event
399         * @param instances : List of time stamp seprated by ","
400         * @param unresolvedInstances : list of instance with latest/future function
401         * @return : list of URIs separated by ",".
402         * @throws Exception thrown if failed to create URIs from unresolvedInstances
403         */
404        @SuppressWarnings("unused")
405        private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception {
406            if (instances == null || instances.length() == 0) {
407                return "";
408            }
409            String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR);
410            StringBuilder uris = new StringBuilder();
411    
412            for (int i = 0; i < instanceList.length; i++) {
413                int funcType = CoordCommandUtils.getFuncType(instanceList[i]);
414                if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) {
415                    if (unresolvedInstances.length() > 0) {
416                        unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR);
417                    }
418                    unresolvedInstances.append(instanceList[i]);
419                    continue;
420                }
421                ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]);
422                if (uris.length() > 0) {
423                    uris.append(CoordELFunctions.INSTANCE_SEPARATOR);
424                }
425                uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild(
426                        "uri-template", event.getNamespace()).getTextTrim()));
427            }
428            return uris.toString();
429        }
430    
431        /* (non-Javadoc)
432         * @see org.apache.oozie.command.XCommand#getEntityKey()
433         */
434        @Override
435        protected String getEntityKey() {
436            return coordAction.getJobId();
437        }
438    
439        /* (non-Javadoc)
440         * @see org.apache.oozie.command.XCommand#isLockRequired()
441         */
442        @Override
443        protected boolean isLockRequired() {
444            return true;
445        }
446    
447        /* (non-Javadoc)
448         * @see org.apache.oozie.command.XCommand#eagerLoadState()
449         */
450        @Override
451        protected void eagerLoadState() throws CommandException {
452            loadState();
453        }
454    
455        /* (non-Javadoc)
456         * @see org.apache.oozie.command.XCommand#loadState()
457         */
458        @Override
459        protected void loadState() throws CommandException {
460            if (jpaService == null) {
461                jpaService = Services.get().get(JPAService.class);
462            }
463            try {
464                coordAction = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
465                coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
466            }
467            catch (JPAExecutorException je) {
468                throw new CommandException(je);
469            }
470            LogUtils.setLogInfo(coordAction, logInfo);
471        }
472    
473        /* (non-Javadoc)
474         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
475         */
476        @Override
477        protected void verifyPrecondition() throws CommandException, PreconditionException {
478            if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
479                throw new PreconditionException(ErrorCode.E1100, "[" + actionId
480                        + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state="
481                        + coordAction.getStatus());
482            }
483    
484            // if eligible to do action input check when running with backward support is true
485            if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
486                return;
487            }
488    
489            if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.PAUSED
490                    && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
491                throw new PreconditionException(
492                        ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." +
493                                    " Coordinator job is not in RUNNING/PAUSED/PAUSEDWITHERROR state, but state="
494                                + coordJob.getStatus());
495            }
496        }
497    
498        /* (non-Javadoc)
499         * @see org.apache.oozie.command.XCommand#getKey()
500         */
501        @Override
502        public String getKey(){
503            return getName() + "_" + actionId;
504        }
505    
506    }