001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.util.Date;
023    import java.util.List;
024    
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.fs.Path;
027    import org.apache.oozie.CoordinatorActionBean;
028    import org.apache.oozie.CoordinatorJobBean;
029    import org.apache.oozie.ErrorCode;
030    import org.apache.oozie.client.CoordinatorAction;
031    import org.apache.oozie.client.Job;
032    import org.apache.oozie.client.OozieClient;
033    import org.apache.oozie.command.CommandException;
034    import org.apache.oozie.command.PreconditionException;
035    import org.apache.oozie.coord.CoordELEvaluator;
036    import org.apache.oozie.coord.CoordELFunctions;
037    import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
038    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
039    import org.apache.oozie.executor.jpa.JPAExecutorException;
040    import org.apache.oozie.service.HadoopAccessorException;
041    import org.apache.oozie.service.HadoopAccessorService;
042    import org.apache.oozie.service.JPAService;
043    import org.apache.oozie.service.Service;
044    import org.apache.oozie.service.Services;
045    import org.apache.oozie.util.DateUtils;
046    import org.apache.oozie.util.ELEvaluator;
047    import org.apache.oozie.util.Instrumentation;
048    import org.apache.oozie.util.LogUtils;
049    import org.apache.oozie.util.ParamChecker;
050    import org.apache.oozie.util.StatusUtils;
051    import org.apache.oozie.util.XConfiguration;
052    import org.apache.oozie.util.XmlUtils;
053    import org.jdom.Element;
054    
055    /**
056     * The command to check if an action's data input paths exist in the file system.
057     */
058    public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
059    
060        private final String actionId;
061        /**
062         * Property name of command re-queue interval for coordinator action input check in
063         * milliseconds.
064         */
065        public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX
066                + "coord.input.check.requeue.interval";
067        /**
068         * Default re-queue interval in ms. It is applied when no value defined in
069         * the oozie configuration.
070         */
071        private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 60000; // 1 minute
072        private CoordinatorActionBean coordAction = null;
073        private CoordinatorJobBean coordJob = null;
074        private JPAService jpaService = null;
075        private String jobId = null;
076    
077        public CoordActionInputCheckXCommand(String actionId, String jobId) {
078            super("coord_action_input", "coord_action_input", 1);
079            this.actionId = ParamChecker.notEmpty(actionId, "actionId");
080            this.jobId = jobId;
081        }
082    
083        /* (non-Javadoc)
084         * @see org.apache.oozie.command.XCommand#execute()
085         */
086        @Override
087        protected Void execute() throws CommandException {
088            LOG.info("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state.");
089    
090            // this action should only get processed if current time > nominal time;
091            // otherwise, requeue this action for delay execution;
092            Date nominalTime = coordAction.getNominalTime();
093            Date currentTime = new Date();
094            if (nominalTime.compareTo(currentTime) > 0) {
095                queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), Math.max((nominalTime.getTime() - currentTime
096                        .getTime()), getCoordInputCheckRequeueInterval()));
097                // update lastModifiedTime
098                coordAction.setLastModifiedTime(new Date());
099                try {
100                    jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor(coordAction));
101                }
102                catch (JPAExecutorException e) {
103                    throw new CommandException(e);
104                }
105                LOG.info("[" + actionId
106                        + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current="
107                        + currentTime + ", nominal=" + nominalTime);
108    
109                return null;
110            }
111    
112            StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
113            Instrumentation.Cron cron = new Instrumentation.Cron();
114            try {
115                Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
116                cron.start();
117                StringBuilder existList = new StringBuilder();
118                StringBuilder nonExistList = new StringBuilder();
119                StringBuilder nonResolvedList = new StringBuilder();
120                String firstMissingDependency = "";
121                CoordCommandUtils.getResolvedList(coordAction.getMissingDependencies(), nonExistList, nonResolvedList);
122    
123                // For clarity regarding which is the missing dependency in synchronous order
124                // instead of printing entire list, some of which, may be available
125                if(nonExistList.length() > 0) {
126                    firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0];
127                }
128                LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " "
129                        + nonResolvedList.toString());
130                // Updating the list of data dependencies that are available and those that are yet not
131                boolean status = checkInput(actionXml, existList, nonExistList, actionConf);
132                coordAction.setLastModifiedTime(currentTime);
133                coordAction.setActionXml(actionXml.toString());
134                if (nonResolvedList.length() > 0 && status == false) {
135                    nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
136                }
137                coordAction.setMissingDependencies(nonExistList.toString());
138                if (status == true) {
139                    coordAction.setStatus(CoordinatorAction.Status.READY);
140                    // pass jobID to the CoordActionReadyXCommand
141                    queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
142                }
143                else {
144                    long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
145                            .getCreatedTime().getTime()))
146                            / (60 * 1000);
147                    int timeOut = coordAction.getTimeOut();
148                    if ((timeOut >= 0) && (waitingTime > timeOut)) {
149                        queue(new CoordActionTimeOutXCommand(coordAction), 100);
150                    }
151                    else {
152                        queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), getCoordInputCheckRequeueInterval());
153                    }
154                }
155                coordAction.setLastModifiedTime(new Date());
156                jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor(coordAction));
157            }
158            catch (Exception e) {
159                throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
160            }
161            cron.stop();
162    
163            return null;
164        }
165    
166        /**
167         * This function reads the value of re-queue interval for coordinator input
168         * check command from the Oozie configuration provided by Configuration
169         * Service. If nothing defined in the configuration, it uses the code
170         * specified default value.
171         *
172         * @return re-queue interval in ms
173         */
174        public long getCoordInputCheckRequeueInterval() {
175            long requeueInterval = Services.get().getConf().getLong(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL,
176                    DEFAULT_COMMAND_REQUEUE_INTERVAL);
177            return requeueInterval;
178        }
179    
180        /**
181         * To check the list of input paths if all of them exist
182         *
183         * @param actionXml action xml
184         * @param existList the list of existed paths
185         * @param nonExistList the list of non existed paths
186         * @param conf action configuration
187         * @return true if all input paths are existed
188         * @throws Exception thrown of unable to check input path
189         */
190        protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
191                Configuration conf) throws Exception {
192            Element eAction = XmlUtils.parseXml(actionXml.toString());
193            boolean allExist = checkResolvedUris(eAction, existList, nonExistList, conf);
194            if (allExist) {
195                LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
196                allExist = checkUnresolvedInstances(eAction, conf);
197            }
198            if (allExist == true) {
199                materializeDataProperties(eAction, conf);
200                actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString());
201            }
202            return allExist;
203        }
204    
205        /**
206         * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list
207         * of files that will be needed.
208         *
209         * @param eAction action element
210         * @param conf action configuration
211         * @throws Exception thrown if failed to resolve data properties
212         * @update modify 'Action' element with appropriate list of files.
213         */
214        @SuppressWarnings("unchecked")
215        private void materializeDataProperties(Element eAction, Configuration conf) throws Exception {
216            ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, conf, actionId);
217            Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow",
218                    eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
219            if (configElem != null) {
220                for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
221                    resolveTagContents("value", propElem, eval);
222                }
223            }
224        }
225    
226        /**
227         * To resolve property value which contains el functions
228         *
229         * @param tagName tag name
230         * @param elem the child element of "property" element
231         * @param eval el functions evaluator
232         * @throws Exception thrown if unable to resolve tag value
233         */
234        private void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception {
235            if (elem == null) {
236                return;
237            }
238            Element tagElem = elem.getChild(tagName, elem.getNamespace());
239            if (tagElem != null) {
240                String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText());
241                tagElem.removeContent();
242                tagElem.addContent(updated);
243            }
244            else {
245                LOG.warn(" Value NOT FOUND " + tagName);
246            }
247        }
248    
249        /**
250         * Check if any unsolved paths under data output. Resolve the unresolved data input paths.
251         *
252         * @param eAction action element
253         * @param actionConf action configuration
254         * @return true if successful to resolve input and output paths
255         * @throws Exception thrown if failed to resolve data input and output paths
256         */
257        @SuppressWarnings("unchecked")
258        private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf) throws Exception {
259            String strAction = XmlUtils.prettyPrint(eAction).toString();
260            Date nominalTime = DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time"));
261            String actualTimeStr = eAction.getAttributeValue("action-actual-time");
262            Date actualTime = null;
263            if (actualTimeStr == null) {
264                LOG.debug("Unable to get action-actual-time from action xml, this job is submitted " +
265                "from previous version. Assign current date to actual time, action = " + actionId);
266                actualTime = new Date();
267            } else {
268                actualTime = DateUtils.parseDateOozieTZ(actualTimeStr);
269            }
270    
271            StringBuffer resultedXml = new StringBuffer();
272    
273            boolean ret;
274            Element inputList = eAction.getChild("input-events", eAction.getNamespace());
275            if (inputList != null) {
276                ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime,
277                        actualTime, actionConf);
278                if (ret == false) {
279                    resultedXml.append(strAction);
280                    return false;
281                }
282            }
283    
284            // Using latest() or future() in output-event is not intuitive.
285            // We need to make sure, this assumption is correct.
286            Element outputList = eAction.getChild("output-events", eAction.getNamespace());
287            if (outputList != null) {
288                for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
289                    if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) != null) {
290                        throw new CommandException(ErrorCode.E1006, "coord:latest()/future()",
291                                " not permitted in output-event ");
292                    }
293                }
294            }
295            return true;
296        }
297    
298        /**
299         * Resolve the list of data input paths
300         *
301         * @param eDataEvents the list of data input elements
302         * @param nominalTime action nominal time
303         * @param actualTime current time
304         * @param conf action configuration
305         * @return true if all unresolved URIs can be resolved
306         * @throws Exception thrown if failed to resolve data input paths
307         */
308        @SuppressWarnings("unchecked")
309        private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime,
310                Configuration conf) throws Exception {
311            for (Element dEvent : eDataEvents) {
312                if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) == null) {
313                    continue;
314                }
315                ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf);
316                String uresolvedInstance = dEvent.getChild("unresolved-instances", dEvent.getNamespace()).getTextTrim();
317                String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR);
318                StringBuffer resolvedTmp = new StringBuffer();
319                for (int i = 0; i < unresolvedList.length; i++) {
320                    String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]);
321                    Boolean isResolved = (Boolean) eval.getVariable("is_resolved");
322                    if (isResolved == false) {
323                        LOG.info("[" + actionId + "]::Cannot resolve: " + ret);
324                        return false;
325                    }
326                    if (resolvedTmp.length() > 0) {
327                        resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR);
328                    }
329                    resolvedTmp.append((String) eval.getVariable("resolved_path"));
330                }
331                if (resolvedTmp.length() > 0) {
332                    if (dEvent.getChild("uris", dEvent.getNamespace()) != null) {
333                        resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append(
334                                dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim());
335                        dEvent.removeChild("uris", dEvent.getNamespace());
336                    }
337                    Element uriInstance = new Element("uris", dEvent.getNamespace());
338                    uriInstance.addContent(resolvedTmp.toString());
339                    dEvent.getContent().add(1, uriInstance);
340                }
341                dEvent.removeChild("unresolved-instances", dEvent.getNamespace());
342            }
343    
344            return true;
345        }
346    
347        /**
348         * Check all resolved URIs existence
349         *
350         * @param eAction action element
351         * @param existList the list of existed paths
352         * @param nonExistList the list of paths to check existence
353         * @param conf action configuration
354         * @return true if all nonExistList paths exist
355         * @throws IOException thrown if unable to access the path
356         */
357        private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList,
358                Configuration conf) throws IOException {
359            LOG.info("[" + actionId + "]::ActionInputCheck:: In checkResolvedUris...");
360            Element inputList = eAction.getChild("input-events", eAction.getNamespace());
361            if (inputList != null) {
362                if (nonExistList.length() > 0) {
363                    checkListOfPaths(existList, nonExistList, conf);
364                }
365                return nonExistList.length() == 0;
366            }
367            return true;
368        }
369    
370        /**
371         * Check a list of non existed paths and add to exist list if it exists
372         *
373         * @param existList the list of existed paths
374         * @param nonExistList the list of paths to check existence
375         * @param conf action configuration
376         * @return true if all nonExistList paths exist
377         * @throws IOException thrown if unable to access the path
378         */
379        private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf)
380                throws IOException {
381    
382            String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR);
383            if (uriList[0] != null) {
384                LOG.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing.");
385            }
386    
387            nonExistList.delete(0, nonExistList.length());
388            boolean allExists = true;
389            String existSeparator = "", nonExistSeparator = "";
390            for (int i = 0; i < uriList.length; i++) {
391                if (allExists) {
392                    allExists = pathExists(uriList[i], conf);
393                    LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists);
394                }
395                if (allExists) {
396                    existList.append(existSeparator).append(uriList[i]);
397                    existSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
398                }
399                else {
400                    nonExistList.append(nonExistSeparator).append(uriList[i]);
401                    nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
402                }
403            }
404            return allExists;
405        }
406    
407        /**
408         * Check if given path exists
409         *
410         * @param sPath uri path
411         * @param actionConf action configuration
412         * @return true if path exists
413         * @throws IOException thrown if unable to access the path
414         */
415        private boolean pathExists(String sPath, Configuration actionConf) throws IOException {
416            LOG.debug("checking for the file " + sPath);
417            Path path = new Path(sPath);
418            String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
419            try {
420                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
421                Configuration fsConf = has.createJobConf(path.toUri().getAuthority());
422                return has.createFileSystem(user, path.toUri(), fsConf).exists(path);
423            }
424            catch (HadoopAccessorException e) {
425                throw new IOException(e);
426            }
427        }
428    
429        /**
430         * The function create a list of URIs separated by "," using the instances time stamp and URI-template
431         *
432         * @param event : <data-in> event
433         * @param instances : List of time stamp seprated by ","
434         * @param unresolvedInstances : list of instance with latest/future function
435         * @return : list of URIs separated by ",".
436         * @throws Exception thrown if failed to create URIs from unresolvedInstances
437         */
438        @SuppressWarnings("unused")
439        private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception {
440            if (instances == null || instances.length() == 0) {
441                return "";
442            }
443            String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR);
444            StringBuilder uris = new StringBuilder();
445    
446            for (int i = 0; i < instanceList.length; i++) {
447                int funcType = CoordCommandUtils.getFuncType(instanceList[i]);
448                if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) {
449                    if (unresolvedInstances.length() > 0) {
450                        unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR);
451                    }
452                    unresolvedInstances.append(instanceList[i]);
453                    continue;
454                }
455                ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]);
456                if (uris.length() > 0) {
457                    uris.append(CoordELFunctions.INSTANCE_SEPARATOR);
458                }
459                uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild(
460                        "uri-template", event.getNamespace()).getTextTrim()));
461            }
462            return uris.toString();
463        }
464    
465        /* (non-Javadoc)
466         * @see org.apache.oozie.command.XCommand#getEntityKey()
467         */
468        @Override
469        public String getEntityKey() {
470            return this.jobId;
471        }
472    
473        /* (non-Javadoc)
474         * @see org.apache.oozie.command.XCommand#isLockRequired()
475         */
476        @Override
477        protected boolean isLockRequired() {
478            return true;
479        }
480    
481        /* (non-Javadoc)
482         * @see org.apache.oozie.command.XCommand#eagerLoadState()
483         */
484        @Override
485        protected void eagerLoadState() throws CommandException {
486            loadState();
487        }
488    
489        /* (non-Javadoc)
490         * @see org.apache.oozie.command.XCommand#loadState()
491         */
492        @Override
493        protected void loadState() throws CommandException {
494            if (jpaService == null) {
495                jpaService = Services.get().get(JPAService.class);
496            }
497            try {
498                coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
499                coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
500            }
501            catch (JPAExecutorException je) {
502                throw new CommandException(je);
503            }
504            LogUtils.setLogInfo(coordAction, logInfo);
505        }
506    
507        /* (non-Javadoc)
508         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
509         */
510        @Override
511        protected void verifyPrecondition() throws CommandException, PreconditionException {
512            if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
513                throw new PreconditionException(ErrorCode.E1100, "[" + actionId
514                        + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state="
515                        + coordAction.getStatus());
516            }
517    
518            // if eligible to do action input check when running with backward support is true
519            if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
520                return;
521            }
522    
523            if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR && coordJob.getStatus() != Job.Status.PAUSED
524                    && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
525                throw new PreconditionException(
526                        ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." +
527                                    " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
528                                + coordJob.getStatus());
529            }
530        }
531    
532        /* (non-Javadoc)
533         * @see org.apache.oozie.command.XCommand#getKey()
534         */
535        @Override
536        public String getKey(){
537            return getName() + "_" + actionId;
538        }
539    
540    }