001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.util.Date;
023    import java.util.List;
024    
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.fs.Path;
027    import org.apache.oozie.CoordinatorActionBean;
028    import org.apache.oozie.CoordinatorJobBean;
029    import org.apache.oozie.ErrorCode;
030    import org.apache.oozie.client.CoordinatorAction;
031    import org.apache.oozie.client.Job;
032    import org.apache.oozie.client.OozieClient;
033    import org.apache.oozie.command.CommandException;
034    import org.apache.oozie.command.PreconditionException;
035    import org.apache.oozie.coord.CoordELEvaluator;
036    import org.apache.oozie.coord.CoordELFunctions;
037    import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
038    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
039    import org.apache.oozie.executor.jpa.JPAExecutorException;
040    import org.apache.oozie.service.HadoopAccessorException;
041    import org.apache.oozie.service.HadoopAccessorService;
042    import org.apache.oozie.service.JPAService;
043    import org.apache.oozie.service.Service;
044    import org.apache.oozie.service.Services;
045    import org.apache.oozie.util.DateUtils;
046    import org.apache.oozie.util.ELEvaluator;
047    import org.apache.oozie.util.Instrumentation;
048    import org.apache.oozie.util.LogUtils;
049    import org.apache.oozie.util.ParamChecker;
050    import org.apache.oozie.util.StatusUtils;
051    import org.apache.oozie.util.XConfiguration;
052    import org.apache.oozie.util.XmlUtils;
053    import org.jdom.Element;
054    
055    /**
056     * The command to check if an action's data input paths exist in the file system.
057     */
058    public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
059    
060        private final String actionId;
061        /**
062         * Property name of command re-queue interval for coordinator action input check in
063         * milliseconds.
064         */
065        public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX
066                + "coord.input.check.requeue.interval";
067        /**
068         * Default re-queue interval in ms. It is applied when no value defined in
069         * the oozie configuration.
070         */
071        private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 60000; // 1 minute
072        private CoordinatorActionBean coordAction = null;
073        private CoordinatorJobBean coordJob = null;
074        private JPAService jpaService = null;
075        private String jobId = null;
076    
077        public CoordActionInputCheckXCommand(String actionId, String jobId) {
078            super("coord_action_input", "coord_action_input", 1);
079            this.actionId = ParamChecker.notEmpty(actionId, "actionId");
080            this.jobId = jobId;
081        }
082    
083        /* (non-Javadoc)
084         * @see org.apache.oozie.command.XCommand#execute()
085         */
086        @Override
087        protected Void execute() throws CommandException {
088            LOG.info("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state.");
089    
090            // this action should only get processed if current time > nominal time;
091            // otherwise, requeue this action for delay execution;
092            Date nominalTime = coordAction.getNominalTime();
093            Date currentTime = new Date();
094            if (nominalTime.compareTo(currentTime) > 0) {
095                queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), Math.max((nominalTime.getTime() - currentTime
096                        .getTime()), getCoordInputCheckRequeueInterval()));
097                // update lastModifiedTime
098                coordAction.setLastModifiedTime(new Date());
099                try {
100                    jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor(coordAction));
101                }
102                catch (JPAExecutorException e) {
103                    throw new CommandException(e);
104                }
105                LOG.info("[" + actionId
106                        + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current="
107                        + currentTime + ", nominal=" + nominalTime);
108    
109                return null;
110            }
111    
112            StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
113            Instrumentation.Cron cron = new Instrumentation.Cron();
114            try {
115                Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
116                cron.start();
117                StringBuilder existList = new StringBuilder();
118                StringBuilder nonExistList = new StringBuilder();
119                StringBuilder nonResolvedList = new StringBuilder();
120                CoordCommandUtils.getResolvedList(coordAction.getMissingDependencies(), nonExistList, nonResolvedList);
121    
122                LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + nonExistList.toString() + " "
123                        + nonResolvedList.toString());
124                boolean status = checkInput(actionXml, existList, nonExistList, actionConf);
125                coordAction.setLastModifiedTime(currentTime);
126                coordAction.setActionXml(actionXml.toString());
127                if (nonResolvedList.length() > 0 && status == false) {
128                    nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
129                }
130                coordAction.setMissingDependencies(nonExistList.toString());
131                if (status == true) {
132                    coordAction.setStatus(CoordinatorAction.Status.READY);
133                    // pass jobID to the CoordActionReadyXCommand
134                    queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
135                }
136                else {
137                    long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
138                            .getCreatedTime().getTime()))
139                            / (60 * 1000);
140                    int timeOut = coordAction.getTimeOut();
141                    if ((timeOut >= 0) && (waitingTime > timeOut)) {
142                        queue(new CoordActionTimeOutXCommand(coordAction), 100);
143                    }
144                    else {
145                        queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), getCoordInputCheckRequeueInterval());
146                    }
147                }
148                coordAction.setLastModifiedTime(new Date());
149                jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor(coordAction));
150            }
151            catch (Exception e) {
152                throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
153            }
154            cron.stop();
155    
156            return null;
157        }
158    
159        /**
160         * This function reads the value of re-queue interval for coordinator input
161         * check command from the Oozie configuration provided by Configuration
162         * Service. If nothing defined in the configuration, it uses the code
163         * specified default value.
164         *
165         * @return re-queue interval in ms
166         */
167        public long getCoordInputCheckRequeueInterval() {
168            long requeueInterval = Services.get().getConf().getLong(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL,
169                    DEFAULT_COMMAND_REQUEUE_INTERVAL);
170            return requeueInterval;
171        }
172    
173        /**
174         * To check the list of input paths if all of them exist
175         *
176         * @param actionXml action xml
177         * @param existList the list of existed paths
178         * @param nonExistList the list of non existed paths
179         * @param conf action configuration
180         * @return true if all input paths are existed
181         * @throws Exception thrown of unable to check input path
182         */
183        protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
184                Configuration conf) throws Exception {
185            Element eAction = XmlUtils.parseXml(actionXml.toString());
186            boolean allExist = checkResolvedUris(eAction, existList, nonExistList, conf);
187            if (allExist) {
188                LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
189                allExist = checkUnresolvedInstances(eAction, conf);
190            }
191            if (allExist == true) {
192                materializeDataProperties(eAction, conf);
193                actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString());
194            }
195            return allExist;
196        }
197    
198        /**
199         * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list
200         * of files that will be needed.
201         *
202         * @param eAction action element
203         * @param conf action configuration
204         * @throws Exception thrown if failed to resolve data properties
205         * @update modify 'Action' element with appropriate list of files.
206         */
207        @SuppressWarnings("unchecked")
208        private void materializeDataProperties(Element eAction, Configuration conf) throws Exception {
209            ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, conf, actionId);
210            Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow",
211                    eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
212            if (configElem != null) {
213                for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
214                    resolveTagContents("value", propElem, eval);
215                }
216            }
217        }
218    
219        /**
220         * To resolve property value which contains el functions
221         *
222         * @param tagName tag name
223         * @param elem the child element of "property" element
224         * @param eval el functions evaluator
225         * @throws Exception thrown if unable to resolve tag value
226         */
227        private void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception {
228            if (elem == null) {
229                return;
230            }
231            Element tagElem = elem.getChild(tagName, elem.getNamespace());
232            if (tagElem != null) {
233                String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText());
234                tagElem.removeContent();
235                tagElem.addContent(updated);
236            }
237            else {
238                LOG.warn(" Value NOT FOUND " + tagName);
239            }
240        }
241    
242        /**
243         * Check if any unsolved paths under data output. Resolve the unresolved data input paths.
244         *
245         * @param eAction action element
246         * @param actionConf action configuration
247         * @return true if successful to resolve input and output paths
248         * @throws Exception thrown if failed to resolve data input and output paths
249         */
250        @SuppressWarnings("unchecked")
251        private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf) throws Exception {
252            String strAction = XmlUtils.prettyPrint(eAction).toString();
253            Date nominalTime = DateUtils.parseDateUTC(eAction.getAttributeValue("action-nominal-time"));
254            String actualTimeStr = eAction.getAttributeValue("action-actual-time");
255            Date actualTime = null;
256            if (actualTimeStr == null) {
257                LOG.debug("Unable to get action-actual-time from action xml, this job is submitted " +
258                "from previous version. Assign current date to actual time, action = " + actionId);
259                actualTime = new Date();
260            } else {
261                actualTime = DateUtils.parseDateUTC(actualTimeStr);
262            }
263    
264            StringBuffer resultedXml = new StringBuffer();
265    
266            boolean ret;
267            Element inputList = eAction.getChild("input-events", eAction.getNamespace());
268            if (inputList != null) {
269                ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime,
270                        actualTime, actionConf);
271                if (ret == false) {
272                    resultedXml.append(strAction);
273                    return false;
274                }
275            }
276    
277            // Using latest() or future() in output-event is not intuitive.
278            // We need to make sure, this assumption is correct.
279            Element outputList = eAction.getChild("output-events", eAction.getNamespace());
280            if (outputList != null) {
281                for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
282                    if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) != null) {
283                        throw new CommandException(ErrorCode.E1006, "coord:latest()/future()",
284                                " not permitted in output-event ");
285                    }
286                }
287            }
288            return true;
289        }
290    
291        /**
292         * Resolve the list of data input paths
293         *
294         * @param eDataEvents the list of data input elements
295         * @param nominalTime action nominal time
296         * @param actualTime current time
297         * @param conf action configuration
298         * @return true if all unresolved URIs can be resolved
299         * @throws Exception thrown if failed to resolve data input paths
300         */
301        @SuppressWarnings("unchecked")
302        private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime,
303                Configuration conf) throws Exception {
304            for (Element dEvent : eDataEvents) {
305                if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) == null) {
306                    continue;
307                }
308                ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf);
309                String uresolvedInstance = dEvent.getChild("unresolved-instances", dEvent.getNamespace()).getTextTrim();
310                String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR);
311                StringBuffer resolvedTmp = new StringBuffer();
312                for (int i = 0; i < unresolvedList.length; i++) {
313                    String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]);
314                    Boolean isResolved = (Boolean) eval.getVariable("is_resolved");
315                    if (isResolved == false) {
316                        LOG.info("[" + actionId + "]::Cannot resolve: " + ret);
317                        return false;
318                    }
319                    if (resolvedTmp.length() > 0) {
320                        resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR);
321                    }
322                    resolvedTmp.append((String) eval.getVariable("resolved_path"));
323                }
324                if (resolvedTmp.length() > 0) {
325                    if (dEvent.getChild("uris", dEvent.getNamespace()) != null) {
326                        resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append(
327                                dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim());
328                        dEvent.removeChild("uris", dEvent.getNamespace());
329                    }
330                    Element uriInstance = new Element("uris", dEvent.getNamespace());
331                    uriInstance.addContent(resolvedTmp.toString());
332                    dEvent.getContent().add(1, uriInstance);
333                }
334                dEvent.removeChild("unresolved-instances", dEvent.getNamespace());
335            }
336    
337            return true;
338        }
339    
340        /**
341         * Check all resolved URIs existence
342         *
343         * @param eAction action element
344         * @param existList the list of existed paths
345         * @param nonExistList the list of paths to check existence
346         * @param conf action configuration
347         * @return true if all nonExistList paths exist
348         * @throws IOException thrown if unable to access the path
349         */
350        private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList,
351                Configuration conf) throws IOException {
352            LOG.info("[" + actionId + "]::ActionInputCheck:: In checkResolvedUris...");
353            Element inputList = eAction.getChild("input-events", eAction.getNamespace());
354            if (inputList != null) {
355                if (nonExistList.length() > 0) {
356                    checkListOfPaths(existList, nonExistList, conf);
357                }
358                return nonExistList.length() == 0;
359            }
360            return true;
361        }
362    
363        /**
364         * Check a list of non existed paths and add to exist list if it exists
365         *
366         * @param existList the list of existed paths
367         * @param nonExistList the list of paths to check existence
368         * @param conf action configuration
369         * @return true if all nonExistList paths exist
370         * @throws IOException thrown if unable to access the path
371         */
372        private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf)
373                throws IOException {
374    
375            String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR);
376            if (uriList[0] != null) {
377                LOG.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing.");
378            }
379    
380            nonExistList.delete(0, nonExistList.length());
381            boolean allExists = true;
382            String existSeparator = "", nonExistSeparator = "";
383            for (int i = 0; i < uriList.length; i++) {
384                if (allExists) {
385                    allExists = pathExists(uriList[i], conf);
386                    LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists);
387                }
388                if (allExists) {
389                    existList.append(existSeparator).append(uriList[i]);
390                    existSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
391                }
392                else {
393                    nonExistList.append(nonExistSeparator).append(uriList[i]);
394                    nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
395                }
396            }
397            return allExists;
398        }
399    
400        /**
401         * Check if given path exists
402         *
403         * @param sPath uri path
404         * @param actionConf action configuration
405         * @return true if path exists
406         * @throws IOException thrown if unable to access the path
407         */
408        private boolean pathExists(String sPath, Configuration actionConf) throws IOException {
409            LOG.debug("checking for the file " + sPath);
410            Path path = new Path(sPath);
411            String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
412            try {
413                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
414                Configuration fsConf = has.createJobConf(path.toUri().getAuthority());
415                return has.createFileSystem(user, path.toUri(), fsConf).exists(path);
416            }
417            catch (HadoopAccessorException e) {
418                throw new IOException(e);
419            }
420        }
421    
422        /**
423         * The function create a list of URIs separated by "," using the instances time stamp and URI-template
424         *
425         * @param event : <data-in> event
426         * @param instances : List of time stamp seprated by ","
427         * @param unresolvedInstances : list of instance with latest/future function
428         * @return : list of URIs separated by ",".
429         * @throws Exception thrown if failed to create URIs from unresolvedInstances
430         */
431        @SuppressWarnings("unused")
432        private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception {
433            if (instances == null || instances.length() == 0) {
434                return "";
435            }
436            String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR);
437            StringBuilder uris = new StringBuilder();
438    
439            for (int i = 0; i < instanceList.length; i++) {
440                int funcType = CoordCommandUtils.getFuncType(instanceList[i]);
441                if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) {
442                    if (unresolvedInstances.length() > 0) {
443                        unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR);
444                    }
445                    unresolvedInstances.append(instanceList[i]);
446                    continue;
447                }
448                ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]);
449                if (uris.length() > 0) {
450                    uris.append(CoordELFunctions.INSTANCE_SEPARATOR);
451                }
452                uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild(
453                        "uri-template", event.getNamespace()).getTextTrim()));
454            }
455            return uris.toString();
456        }
457    
458        /* (non-Javadoc)
459         * @see org.apache.oozie.command.XCommand#getEntityKey()
460         */
461        @Override
462        public String getEntityKey() {
463            return this.jobId;
464        }
465    
466        /* (non-Javadoc)
467         * @see org.apache.oozie.command.XCommand#isLockRequired()
468         */
469        @Override
470        protected boolean isLockRequired() {
471            return true;
472        }
473    
474        /* (non-Javadoc)
475         * @see org.apache.oozie.command.XCommand#eagerLoadState()
476         */
477        @Override
478        protected void eagerLoadState() throws CommandException {
479            loadState();
480        }
481    
482        /* (non-Javadoc)
483         * @see org.apache.oozie.command.XCommand#loadState()
484         */
485        @Override
486        protected void loadState() throws CommandException {
487            if (jpaService == null) {
488                jpaService = Services.get().get(JPAService.class);
489            }
490            try {
491                coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
492                coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
493            }
494            catch (JPAExecutorException je) {
495                throw new CommandException(je);
496            }
497            LogUtils.setLogInfo(coordAction, logInfo);
498        }
499    
500        /* (non-Javadoc)
501         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
502         */
503        @Override
504        protected void verifyPrecondition() throws CommandException, PreconditionException {
505            if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
506                throw new PreconditionException(ErrorCode.E1100, "[" + actionId
507                        + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state="
508                        + coordAction.getStatus());
509            }
510    
511            // if eligible to do action input check when running with backward support is true
512            if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
513                return;
514            }
515    
516            if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.PAUSED
517                    && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
518                throw new PreconditionException(
519                        ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." +
520                                    " Coordinator job is not in RUNNING/PAUSED/PAUSEDWITHERROR state, but state="
521                                + coordJob.getStatus());
522            }
523        }
524    
525        /* (non-Javadoc)
526         * @see org.apache.oozie.command.XCommand#getKey()
527         */
528        @Override
529        public String getKey(){
530            return getName() + "_" + actionId;
531        }
532    
533    }