001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.util.Date;
023    import java.util.List;
024    
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.fs.Path;
027    import org.apache.oozie.CoordinatorActionBean;
028    import org.apache.oozie.CoordinatorJobBean;
029    import org.apache.oozie.ErrorCode;
030    import org.apache.oozie.client.CoordinatorAction;
031    import org.apache.oozie.client.Job;
032    import org.apache.oozie.client.OozieClient;
033    import org.apache.oozie.command.CommandException;
034    import org.apache.oozie.command.PreconditionException;
035    import org.apache.oozie.coord.CoordELEvaluator;
036    import org.apache.oozie.coord.CoordELFunctions;
037    import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
038    import org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor;
039    import org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor;
040    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
041    import org.apache.oozie.executor.jpa.JPAExecutorException;
042    import org.apache.oozie.service.HadoopAccessorException;
043    import org.apache.oozie.service.HadoopAccessorService;
044    import org.apache.oozie.service.JPAService;
045    import org.apache.oozie.service.Service;
046    import org.apache.oozie.service.Services;
047    import org.apache.oozie.util.DateUtils;
048    import org.apache.oozie.util.ELEvaluator;
049    import org.apache.oozie.util.Instrumentation;
050    import org.apache.oozie.util.LogUtils;
051    import org.apache.oozie.util.ParamChecker;
052    import org.apache.oozie.util.StatusUtils;
053    import org.apache.oozie.util.XConfiguration;
054    import org.apache.oozie.util.XmlUtils;
055    import org.jdom.Element;
056    
057    /**
058     * The command to check if an action's data input paths exist in the file system.
059     */
060    public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
061    
062        private final String actionId;
063        /**
064         * Property name of command re-queue interval for coordinator action input check in
065         * milliseconds.
066         */
067        public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX
068                + "coord.input.check.requeue.interval";
069        /**
070         * Default re-queue interval in ms. It is applied when no value defined in
071         * the oozie configuration.
072         */
073        private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 60000; // 1 minute
074        private CoordinatorActionBean coordAction = null;
075        private CoordinatorJobBean coordJob = null;
076        private JPAService jpaService = null;
077        private String jobId = null;
078    
079        public CoordActionInputCheckXCommand(String actionId, String jobId) {
080            super("coord_action_input", "coord_action_input", 1);
081            this.actionId = ParamChecker.notEmpty(actionId, "actionId");
082            this.jobId = jobId;
083        }
084    
085        /* (non-Javadoc)
086         * @see org.apache.oozie.command.XCommand#execute()
087         */
088        @Override
089        protected Void execute() throws CommandException {
090            LOG.info("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state.");
091    
092            // this action should only get processed if current time > nominal time;
093            // otherwise, requeue this action for delay execution;
094            Date nominalTime = coordAction.getNominalTime();
095            Date currentTime = new Date();
096            if (nominalTime.compareTo(currentTime) > 0) {
097                queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), Math.max((nominalTime.getTime() - currentTime
098                        .getTime()), getCoordInputCheckRequeueInterval()));
099                // update lastModifiedTime
100                coordAction.setLastModifiedTime(new Date());
101                try {
102                    jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(coordAction));
103                }
104                catch (JPAExecutorException e) {
105                    throw new CommandException(e);
106                }
107                LOG.info("[" + actionId
108                        + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current="
109                        + currentTime + ", nominal=" + nominalTime);
110    
111                return null;
112            }
113    
114            StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
115            Instrumentation.Cron cron = new Instrumentation.Cron();
116            boolean isChangeInDependency = false;
117            try {
118                Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
119                cron.start();
120                StringBuilder existList = new StringBuilder();
121                StringBuilder nonExistList = new StringBuilder();
122                StringBuilder nonResolvedList = new StringBuilder();
123                String firstMissingDependency = "";
124                String missingDeps = coordAction.getMissingDependencies();
125                CoordCommandUtils.getResolvedList(missingDeps, nonExistList, nonResolvedList);
126    
127                // For clarity regarding which is the missing dependency in synchronous order
128                // instead of printing entire list, some of which, may be available
129                if(nonExistList.length() > 0) {
130                    firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0];
131                }
132                LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " "
133                        + nonResolvedList.toString());
134                // Updating the list of data dependencies that are available and those that are yet not
135                boolean status = checkInput(actionXml, existList, nonExistList, actionConf);
136                coordAction.setLastModifiedTime(currentTime);
137                coordAction.setActionXml(actionXml.toString());
138                if (nonResolvedList.length() > 0 && status == false) {
139                    nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
140                }
141                String nonExistListStr = nonExistList.toString();
142                if (!nonExistListStr.equals(missingDeps) || missingDeps.isEmpty()) {
143                    // missingDeps empty means action should become READY
144                    isChangeInDependency = true;
145                    coordAction.setMissingDependencies(nonExistListStr);
146                }
147                if (status == true) {
148                    coordAction.setStatus(CoordinatorAction.Status.READY);
149                    // pass jobID to the CoordActionReadyXCommand
150                    queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
151                }
152                else {
153                    long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
154                            .getCreatedTime().getTime()))
155                            / (60 * 1000);
156                    int timeOut = coordAction.getTimeOut();
157                    if ((timeOut >= 0) && (waitingTime > timeOut)) {
158                        queue(new CoordActionTimeOutXCommand(coordAction), 100);
159                    }
160                    else {
161                        queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), getCoordInputCheckRequeueInterval());
162                    }
163                }
164            }
165            catch (Exception e) {
166                throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
167            }
168            finally {
169                coordAction.setLastModifiedTime(new Date());
170                cron.stop();
171                if(jpaService != null) {
172                    try {
173                        if (isChangeInDependency) {
174                            jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(coordAction));
175                        }
176                        else {
177                            jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction));
178                        }
179                    }
180                    catch(JPAExecutorException jex) {
181                        throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
182                    }
183                }
184            }
185            return null;
186        }
187    
188        /**
189         * This function reads the value of re-queue interval for coordinator input
190         * check command from the Oozie configuration provided by Configuration
191         * Service. If nothing defined in the configuration, it uses the code
192         * specified default value.
193         *
194         * @return re-queue interval in ms
195         */
196        public long getCoordInputCheckRequeueInterval() {
197            long requeueInterval = Services.get().getConf().getLong(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL,
198                    DEFAULT_COMMAND_REQUEUE_INTERVAL);
199            return requeueInterval;
200        }
201    
202        /**
203         * To check the list of input paths if all of them exist
204         *
205         * @param actionXml action xml
206         * @param existList the list of existed paths
207         * @param nonExistList the list of non existed paths
208         * @param conf action configuration
209         * @return true if all input paths are existed
210         * @throws Exception thrown of unable to check input path
211         */
212        protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
213                Configuration conf) throws Exception {
214            Element eAction = XmlUtils.parseXml(actionXml.toString());
215            boolean allExist = checkResolvedUris(eAction, existList, nonExistList, conf);
216            if (allExist) {
217                LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
218                allExist = checkUnresolvedInstances(eAction, conf);
219            }
220            if (allExist == true) {
221                materializeDataProperties(eAction, conf);
222                actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString());
223            }
224            return allExist;
225        }
226    
227        /**
228         * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list
229         * of files that will be needed.
230         *
231         * @param eAction action element
232         * @param conf action configuration
233         * @throws Exception thrown if failed to resolve data properties
234         * @update modify 'Action' element with appropriate list of files.
235         */
236        @SuppressWarnings("unchecked")
237        private void materializeDataProperties(Element eAction, Configuration conf) throws Exception {
238            ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, conf, actionId);
239            Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow",
240                    eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
241            if (configElem != null) {
242                for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
243                    resolveTagContents("value", propElem, eval);
244                }
245            }
246        }
247    
248        /**
249         * To resolve property value which contains el functions
250         *
251         * @param tagName tag name
252         * @param elem the child element of "property" element
253         * @param eval el functions evaluator
254         * @throws Exception thrown if unable to resolve tag value
255         */
256        private void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception {
257            if (elem == null) {
258                return;
259            }
260            Element tagElem = elem.getChild(tagName, elem.getNamespace());
261            if (tagElem != null) {
262                String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText());
263                tagElem.removeContent();
264                tagElem.addContent(updated);
265            }
266            else {
267                LOG.warn(" Value NOT FOUND " + tagName);
268            }
269        }
270    
271        /**
272         * Check if any unsolved paths under data output. Resolve the unresolved data input paths.
273         *
274         * @param eAction action element
275         * @param actionConf action configuration
276         * @return true if successful to resolve input and output paths
277         * @throws Exception thrown if failed to resolve data input and output paths
278         */
279        @SuppressWarnings("unchecked")
280        private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf) throws Exception {
281            String strAction = XmlUtils.prettyPrint(eAction).toString();
282            Date nominalTime = DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time"));
283            String actualTimeStr = eAction.getAttributeValue("action-actual-time");
284            Date actualTime = null;
285            if (actualTimeStr == null) {
286                LOG.debug("Unable to get action-actual-time from action xml, this job is submitted " +
287                "from previous version. Assign current date to actual time, action = " + actionId);
288                actualTime = new Date();
289            } else {
290                actualTime = DateUtils.parseDateOozieTZ(actualTimeStr);
291            }
292    
293            StringBuffer resultedXml = new StringBuffer();
294    
295            boolean ret;
296            Element inputList = eAction.getChild("input-events", eAction.getNamespace());
297            if (inputList != null) {
298                ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime,
299                        actualTime, actionConf);
300                if (ret == false) {
301                    resultedXml.append(strAction);
302                    return false;
303                }
304            }
305    
306            // Using latest() or future() in output-event is not intuitive.
307            // We need to make sure, this assumption is correct.
308            Element outputList = eAction.getChild("output-events", eAction.getNamespace());
309            if (outputList != null) {
310                for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
311                    if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) != null) {
312                        throw new CommandException(ErrorCode.E1006, "coord:latest()/future()",
313                                " not permitted in output-event ");
314                    }
315                }
316            }
317            return true;
318        }
319    
320        /**
321         * Resolve the list of data input paths
322         *
323         * @param eDataEvents the list of data input elements
324         * @param nominalTime action nominal time
325         * @param actualTime current time
326         * @param conf action configuration
327         * @return true if all unresolved URIs can be resolved
328         * @throws Exception thrown if failed to resolve data input paths
329         */
330        @SuppressWarnings("unchecked")
331        private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime,
332                Configuration conf) throws Exception {
333            for (Element dEvent : eDataEvents) {
334                if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) == null) {
335                    continue;
336                }
337                ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf);
338                String uresolvedInstance = dEvent.getChild("unresolved-instances", dEvent.getNamespace()).getTextTrim();
339                String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR);
340                StringBuffer resolvedTmp = new StringBuffer();
341                for (int i = 0; i < unresolvedList.length; i++) {
342                    String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]);
343                    Boolean isResolved = (Boolean) eval.getVariable("is_resolved");
344                    if (isResolved == false) {
345                        LOG.info("[" + actionId + "]::Cannot resolve: " + ret);
346                        return false;
347                    }
348                    if (resolvedTmp.length() > 0) {
349                        resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR);
350                    }
351                    resolvedTmp.append((String) eval.getVariable("resolved_path"));
352                }
353                if (resolvedTmp.length() > 0) {
354                    if (dEvent.getChild("uris", dEvent.getNamespace()) != null) {
355                        resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append(
356                                dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim());
357                        dEvent.removeChild("uris", dEvent.getNamespace());
358                    }
359                    Element uriInstance = new Element("uris", dEvent.getNamespace());
360                    uriInstance.addContent(resolvedTmp.toString());
361                    dEvent.getContent().add(1, uriInstance);
362                }
363                dEvent.removeChild("unresolved-instances", dEvent.getNamespace());
364            }
365    
366            return true;
367        }
368    
369        /**
370         * Check all resolved URIs existence
371         *
372         * @param eAction action element
373         * @param existList the list of existed paths
374         * @param nonExistList the list of paths to check existence
375         * @param conf action configuration
376         * @return true if all nonExistList paths exist
377         * @throws IOException thrown if unable to access the path
378         */
379        private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList,
380                Configuration conf) throws IOException {
381            LOG.info("[" + actionId + "]::ActionInputCheck:: In checkResolvedUris...");
382            Element inputList = eAction.getChild("input-events", eAction.getNamespace());
383            if (inputList != null) {
384                if (nonExistList.length() > 0) {
385                    checkListOfPaths(existList, nonExistList, conf);
386                }
387                return nonExistList.length() == 0;
388            }
389            return true;
390        }
391    
392        /**
393         * Check a list of non existed paths and add to exist list if it exists
394         *
395         * @param existList the list of existed paths
396         * @param nonExistList the list of paths to check existence
397         * @param conf action configuration
398         * @return true if all nonExistList paths exist
399         * @throws IOException thrown if unable to access the path
400         */
401        private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf)
402                throws IOException {
403    
404            String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR);
405            if (uriList[0] != null) {
406                LOG.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing.");
407            }
408    
409            nonExistList.delete(0, nonExistList.length());
410            boolean allExists = true;
411            String existSeparator = "", nonExistSeparator = "";
412            for (int i = 0; i < uriList.length; i++) {
413                if (allExists) {
414                    allExists = pathExists(uriList[i], conf);
415                    LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists);
416                }
417                if (allExists) {
418                    existList.append(existSeparator).append(uriList[i]);
419                    existSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
420                }
421                else {
422                    nonExistList.append(nonExistSeparator).append(uriList[i]);
423                    nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
424                }
425            }
426            return allExists;
427        }
428    
429        /**
430         * Check if given path exists
431         *
432         * @param sPath uri path
433         * @param actionConf action configuration
434         * @return true if path exists
435         * @throws IOException thrown if unable to access the path
436         */
437        protected boolean pathExists(String sPath, Configuration actionConf) throws IOException {
438            LOG.debug("checking for the file " + sPath);
439            Path path = new Path(sPath);
440            String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
441            try {
442                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
443                Configuration fsConf = has.createJobConf(path.toUri().getAuthority());
444                return has.createFileSystem(user, path.toUri(), fsConf).exists(path);
445            }
446            catch (HadoopAccessorException e) {
447                coordAction.setErrorCode(e.getErrorCode().toString());
448                coordAction.setErrorMessage(e.getMessage());
449                throw new IOException(e);
450            }
451        }
452    
453        /**
454         * The function create a list of URIs separated by "," using the instances time stamp and URI-template
455         *
456         * @param event : <data-in> event
457         * @param instances : List of time stamp seprated by ","
458         * @param unresolvedInstances : list of instance with latest/future function
459         * @return : list of URIs separated by ",".
460         * @throws Exception thrown if failed to create URIs from unresolvedInstances
461         */
462        @SuppressWarnings("unused")
463        private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception {
464            if (instances == null || instances.length() == 0) {
465                return "";
466            }
467            String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR);
468            StringBuilder uris = new StringBuilder();
469    
470            for (int i = 0; i < instanceList.length; i++) {
471                int funcType = CoordCommandUtils.getFuncType(instanceList[i]);
472                if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) {
473                    if (unresolvedInstances.length() > 0) {
474                        unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR);
475                    }
476                    unresolvedInstances.append(instanceList[i]);
477                    continue;
478                }
479                ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]);
480                if (uris.length() > 0) {
481                    uris.append(CoordELFunctions.INSTANCE_SEPARATOR);
482                }
483                uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild(
484                        "uri-template", event.getNamespace()).getTextTrim()));
485            }
486            return uris.toString();
487        }
488    
489        /**
490         * getting the error code of the coord action. (used mainly for unit testing)
491         */
492        protected String getCoordActionErrorCode() {
493            if (coordAction != null) {
494                return coordAction.getErrorCode();
495            }
496            return null;
497        }
498    
499        /**
500         * getting the error message of the coord action. (used mainly for unit testing)
501         */
502        protected String getCoordActionErrorMsg() {
503            if (coordAction != null) {
504                return coordAction.getErrorMessage();
505            }
506            return null;
507        }
508    
509        /* (non-Javadoc)
510         * @see org.apache.oozie.command.XCommand#getEntityKey()
511         */
512        @Override
513        public String getEntityKey() {
514            return this.jobId;
515        }
516    
517        /* (non-Javadoc)
518         * @see org.apache.oozie.command.XCommand#isLockRequired()
519         */
520        @Override
521        protected boolean isLockRequired() {
522            return true;
523        }
524    
525        /* (non-Javadoc)
526         * @see org.apache.oozie.command.XCommand#eagerLoadState()
527         */
528        // TODO - why loadState() is being called from eagerLoadState();
529        @Override
530        protected void eagerLoadState() throws CommandException {
531            loadState();
532        }
533    
534        /* (non-Javadoc)
535         * @see org.apache.oozie.command.XCommand#loadState()
536         */
537        @Override
538        protected void loadState() throws CommandException {
539            if (jpaService == null) {
540                jpaService = Services.get().get(JPAService.class);
541            }
542            try {
543                coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
544                coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
545            }
546            catch (JPAExecutorException je) {
547                throw new CommandException(je);
548            }
549            LogUtils.setLogInfo(coordAction, logInfo);
550        }
551    
552        /* (non-Javadoc)
553         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
554         */
555        @Override
556        protected void verifyPrecondition() throws CommandException, PreconditionException {
557            if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
558                throw new PreconditionException(ErrorCode.E1100, "[" + actionId
559                        + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state="
560                        + coordAction.getStatus());
561            }
562    
563            // if eligible to do action input check when running with backward support is true
564            if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
565                return;
566            }
567    
568            if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR && coordJob.getStatus() != Job.Status.PAUSED
569                    && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
570                throw new PreconditionException(
571                        ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." +
572                                    " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
573                                + coordJob.getStatus());
574            }
575        }
576    
577        /* (non-Javadoc)
578         * @see org.apache.oozie.command.XCommand#getKey()
579         */
580        @Override
581        public String getKey(){
582            return getName() + "_" + actionId;
583        }
584    
585    }