001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import java.io.IOException;
021import java.io.StringReader;
022import java.net.URI;
023import java.net.URISyntaxException;
024import java.text.ParseException;
025import java.util.Calendar;
026import java.util.Date;
027import java.util.List;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.oozie.CoordinatorActionBean;
031import org.apache.oozie.CoordinatorJobBean;
032import org.apache.oozie.ErrorCode;
033import org.apache.oozie.client.CoordinatorAction;
034import org.apache.oozie.client.Job;
035import org.apache.oozie.client.OozieClient;
036import org.apache.oozie.command.CommandException;
037import org.apache.oozie.command.PreconditionException;
038import org.apache.oozie.coord.CoordELEvaluator;
039import org.apache.oozie.coord.CoordELFunctions;
040import org.apache.oozie.coord.TimeUnit;
041import org.apache.oozie.dependency.URIHandler;
042import org.apache.oozie.dependency.URIHandlerException;
043import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
044import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
045import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
046import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
047import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
048import org.apache.oozie.executor.jpa.JPAExecutorException;
049import org.apache.oozie.service.CallableQueueService;
050import org.apache.oozie.service.EventHandlerService;
051import org.apache.oozie.service.JPAService;
052import org.apache.oozie.service.Service;
053import org.apache.oozie.service.Services;
054import org.apache.oozie.service.URIHandlerService;
055import org.apache.oozie.util.DateUtils;
056import org.apache.oozie.util.ELEvaluator;
057import org.apache.oozie.util.LogUtils;
058import org.apache.oozie.util.ParamChecker;
059import org.apache.oozie.util.StatusUtils;
060import org.apache.oozie.util.XConfiguration;
061import org.apache.oozie.util.XLog;
062import org.apache.oozie.util.XmlUtils;
063import org.jdom.Element;
064
065/**
066 * The command to check if an action's data input paths exist in the file system.
067 */
068public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
069
070    private final String actionId;
071    /**
072     * Property name of command re-queue interval for coordinator action input check in
073     * milliseconds.
074     */
075    public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX
076            + "coord.input.check.requeue.interval";
077    /**
078     * Default re-queue interval in ms. It is applied when no value defined in
079     * the oozie configuration.
080     */
081    private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 60000; // 1 minute
082    private CoordinatorActionBean coordAction = null;
083    private CoordinatorJobBean coordJob = null;
084    private JPAService jpaService = null;
085    private String jobId = null;
086
087    public CoordActionInputCheckXCommand(String actionId, String jobId) {
088        super("coord_action_input", "coord_action_input", 1);
089        this.actionId = ParamChecker.notEmpty(actionId, "actionId");
090        this.jobId = jobId;
091    }
092
093    /**
094     * Computes the nominal time of the next action.
095     * Based on CoordMaterializeTransitionXCommand#materializeActions
096     *
097     * @return the nominal time of the next action
098     * @throws ParseException
099     */
100    private Date computeNextNominalTime() throws ParseException {
101        Date nextNominalTime;
102        boolean isCronFrequency = false;
103        int freq = -1;
104        try {
105            freq = Integer.parseInt(coordJob.getFrequency());
106        } catch (NumberFormatException e) {
107            isCronFrequency = true;
108        }
109
110        if (isCronFrequency) {
111            nextNominalTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(coordAction.getNominalTime(), coordJob);
112        } else {
113            Calendar nextNominalTimeCal = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
114            nextNominalTimeCal.setTime(coordAction.getNominalTime());
115            TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr());
116            nextNominalTimeCal.add(freqTU.getCalendarUnit(), freq);
117            nextNominalTime = nextNominalTimeCal.getTime();
118        }
119
120        // If the next nominal time is after the job's end time, then this is the last action, so return null
121        if (nextNominalTime.after(coordJob.getEndTime())) {
122            nextNominalTime = null;
123        }
124        return nextNominalTime;
125    }
126
127    @Override
128    protected Void execute() throws CommandException {
129        LOG.debug("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state.");
130
131        // this action should only get processed if current time > nominal time;
132        // otherwise, requeue this action for delay execution;
133        Date nominalTime = coordAction.getNominalTime();
134        Date currentTime = new Date();
135        if (nominalTime.compareTo(currentTime) > 0) {
136            queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), Math.max((nominalTime.getTime() - currentTime
137                    .getTime()), getCoordInputCheckRequeueInterval()));
138            updateCoordAction(coordAction, false);
139            LOG.info("[" + actionId
140                    + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current="
141                    + DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime));
142
143            return null;
144        }
145
146        StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
147        boolean isChangeInDependency = false;
148        try {
149            Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
150            Date now = new Date();
151            if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.LAST_ONLY)) {
152                Date nextNominalTime = computeNextNominalTime();
153                if (nextNominalTime != null) {
154                    // If the current time is after the next action's nominal time, then we've passed the window where this action
155                    // should be started; so set it to SKIPPED
156                    if (now.after(nextNominalTime)) {
157                        LOG.info("LAST_ONLY execution: Preparing to skip action [{0}] because the current time [{1}] is later than "
158                                + "the nominal time [{2}] of the next action]", coordAction.getId(),
159                                DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
160                        queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
161                        return null;
162                    } else {
163                        LOG.debug("LAST_ONLY execution: Not skipping action [{0}] because the current time [{1}] is earlier than "
164                                + "the nominal time [{2}] of the next action]", coordAction.getId(),
165                                DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
166                    }
167                }
168            }
169            else if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.NONE)) {
170                // If the current time is after the nominal time of this action plus some tolerance,
171                // then we've passed the window where this action
172                // should be started; so set it to SKIPPED
173                Calendar cal = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
174                cal.setTime(nominalTime);
175                cal.add(Calendar.MINUTE, Services.get().getConf().getInt("oozie.coord.execution.none.tolerance", 1));
176                nominalTime = cal.getTime();
177                if (now.after(nominalTime)) {
178                    LOG.info("NONE execution: Preparing to skip action [{0}] because the current time [{1}] is later than "
179                            + "the nominal time [{2}] of the current action]", coordAction.getId(),
180                            DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nominalTime));
181                    queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
182                    return null;
183                } else {
184                    LOG.debug("NONE execution: Not skipping action [{0}] because the current time [{1}] is earlier than "
185                            + "the nominal time [{2}] of the current action]", coordAction.getId(),
186                            DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(coordAction.getNominalTime()));
187                }
188            }
189
190            StringBuilder existList = new StringBuilder();
191            StringBuilder nonExistList = new StringBuilder();
192            StringBuilder nonResolvedList = new StringBuilder();
193            String firstMissingDependency = "";
194            String missingDeps = coordAction.getMissingDependencies();
195            CoordCommandUtils.getResolvedList(missingDeps, nonExistList, nonResolvedList);
196
197            // For clarity regarding which is the missing dependency in synchronous order
198            // instead of printing entire list, some of which, may be available
199            if(nonExistList.length() > 0) {
200                firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0];
201            }
202            LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " "
203                    + nonResolvedList.toString());
204            // Updating the list of data dependencies that are available and those that are yet not
205            boolean status = checkInput(actionXml, existList, nonExistList, actionConf);
206            String pushDeps = coordAction.getPushMissingDependencies();
207            // Resolve latest/future only when all current missingDependencies and
208            // pushMissingDependencies are met
209            if (status && nonResolvedList.length() > 0) {
210                status = (pushDeps == null || pushDeps.length() == 0) ? checkUnResolvedInput(actionXml, actionConf)
211                        : false;
212            }
213            coordAction.setLastModifiedTime(currentTime);
214            coordAction.setActionXml(actionXml.toString());
215            if (nonResolvedList.length() > 0 && status == false) {
216                nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
217            }
218            String nonExistListStr = nonExistList.toString();
219            if (!nonExistListStr.equals(missingDeps) || missingDeps.isEmpty()) {
220                // missingDeps null or empty means action should become READY
221                isChangeInDependency = true;
222                coordAction.setMissingDependencies(nonExistListStr);
223            }
224            if (status && (pushDeps == null || pushDeps.length() == 0)) {
225                String newActionXml = resolveCoordConfiguration(actionXml, actionConf, actionId);
226                actionXml.replace(0, actionXml.length(), newActionXml);
227                coordAction.setActionXml(actionXml.toString());
228                coordAction.setStatus(CoordinatorAction.Status.READY);
229                updateCoordAction(coordAction, true);
230                new CoordActionReadyXCommand(coordAction.getJobId()).call(getEntityKey());
231            }
232            else if (!isTimeout(currentTime)) {
233                if (status == false) {
234                    queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
235                            getCoordInputCheckRequeueInterval());
236                }
237                updateCoordAction(coordAction, isChangeInDependency);
238            }
239            else {
240                if (!nonExistListStr.isEmpty() && pushDeps == null || pushDeps.length() == 0) {
241                    queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
242                }
243                else {
244                    // Let CoordPushDependencyCheckXCommand queue the timeout
245                    queue(new CoordPushDependencyCheckXCommand(coordAction.getId()));
246                }
247                updateCoordAction(coordAction, isChangeInDependency);
248            }
249        }
250        catch (Exception e) {
251            if (isTimeout(currentTime)) {
252                LOG.debug("Queueing timeout command");
253                // XCommand.queue() will not work when there is a Exception
254                Services.get().get(CallableQueueService.class)
255                        .queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
256            }
257            updateCoordAction(coordAction, isChangeInDependency);
258            throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
259        }
260        return null;
261    }
262
263
264    static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId) throws Exception {
265        Element eAction = XmlUtils.parseXml(actionXml.toString());
266        ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, actionConf, actionId);
267        materializeDataProperties(eAction, actionConf, eval);
268        return XmlUtils.prettyPrint(eAction).toString();
269    }
270
271    private boolean isTimeout(Date currentTime) {
272        long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
273                .getCreatedTime().getTime()))
274                / (60 * 1000);
275        int timeOut = coordAction.getTimeOut();
276        return (timeOut >= 0) && (waitingTime > timeOut);
277    }
278
279    private void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency)
280            throws CommandException {
281        coordAction.setLastModifiedTime(new Date());
282        if (jpaService != null) {
283            try {
284                if (isChangeInDependency) {
285                    CoordActionQueryExecutor.getInstance().executeUpdate(
286                            CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK, coordAction);
287                    if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) {
288                        // since event is not to be generated unless action
289                        // RUNNING via StartX
290                        generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
291                    }
292                }
293                else {
294                    CoordActionQueryExecutor.getInstance().executeUpdate(
295                            CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction);
296                }
297            }
298            catch (JPAExecutorException jex) {
299                throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
300            }
301        }
302    }
303
304    /**
305     * This function reads the value of re-queue interval for coordinator input
306     * check command from the Oozie configuration provided by Configuration
307     * Service. If nothing defined in the configuration, it uses the code
308     * specified default value.
309     *
310     * @return re-queue interval in ms
311     */
312    public long getCoordInputCheckRequeueInterval() {
313        long requeueInterval = Services.get().getConf().getLong(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL,
314                DEFAULT_COMMAND_REQUEUE_INTERVAL);
315        return requeueInterval;
316    }
317
318    /**
319     * To check the list of input paths if all of them exist
320     *
321     * @param actionXml action xml
322     * @param existList the list of existed paths
323     * @param nonExistList the list of non existed paths
324     * @param conf action configuration
325     * @return true if all input paths are existed
326     * @throws Exception thrown of unable to check input path
327     */
328    protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
329            Configuration conf) throws Exception {
330        Element eAction = XmlUtils.parseXml(actionXml.toString());
331        return checkResolvedUris(eAction, existList, nonExistList, conf);
332    }
333
334    protected boolean checkUnResolvedInput(StringBuilder actionXml, Configuration conf) throws Exception {
335        Element eAction = XmlUtils.parseXml(actionXml.toString());
336        LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
337        boolean allExist = checkUnresolvedInstances(eAction, conf);
338        if (allExist) {
339            actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString());
340        }
341        return allExist;
342    }
343
344    /**
345     * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list
346     * of files that will be needed.
347     *
348     * @param eAction action element
349     * @param conf action configuration
350     * @throws Exception thrown if failed to resolve data properties
351     * @update modify 'Action' element with appropriate list of files.
352     */
353    @SuppressWarnings("unchecked")
354    static void materializeDataProperties(Element eAction, Configuration conf, ELEvaluator eval) throws Exception {
355        Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow",
356                eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
357        if (configElem != null) {
358            for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
359                resolveTagContents("value", propElem, eval);
360            }
361        }
362    }
363
364    /**
365     * To resolve property value which contains el functions
366     *
367     * @param tagName tag name
368     * @param elem the child element of "property" element
369     * @param eval el functions evaluator
370     * @throws Exception thrown if unable to resolve tag value
371     */
372    private static void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception {
373        if (elem == null) {
374            return;
375        }
376        Element tagElem = elem.getChild(tagName, elem.getNamespace());
377        if (tagElem != null) {
378            String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText());
379            tagElem.removeContent();
380            tagElem.addContent(updated);
381        }
382        else {
383            XLog.getLog(CoordActionInputCheckXCommand.class).warn(" Value NOT FOUND " + tagName);
384        }
385    }
386
387    /**
388     * Check if any unsolved paths under data output. Resolve the unresolved data input paths.
389     *
390     * @param eAction action element
391     * @param actionConf action configuration
392     * @return true if successful to resolve input and output paths
393     * @throws Exception thrown if failed to resolve data input and output paths
394     */
395    @SuppressWarnings("unchecked")
396    private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf) throws Exception {
397        String strAction = XmlUtils.prettyPrint(eAction).toString();
398        Date nominalTime = DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time"));
399        String actualTimeStr = eAction.getAttributeValue("action-actual-time");
400        Date actualTime = null;
401        if (actualTimeStr == null) {
402            LOG.debug("Unable to get action-actual-time from action xml, this job is submitted " +
403            "from previous version. Assign current date to actual time, action = " + actionId);
404            actualTime = new Date();
405        } else {
406            actualTime = DateUtils.parseDateOozieTZ(actualTimeStr);
407        }
408
409        StringBuffer resultedXml = new StringBuffer();
410
411        boolean ret;
412        Element inputList = eAction.getChild("input-events", eAction.getNamespace());
413        if (inputList != null) {
414            ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime,
415                    actualTime, actionConf);
416            if (ret == false) {
417                resultedXml.append(strAction);
418                return false;
419            }
420        }
421
422        // Using latest() or future() in output-event is not intuitive.
423        // We need to make sure, this assumption is correct.
424        Element outputList = eAction.getChild("output-events", eAction.getNamespace());
425        if (outputList != null) {
426            for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
427                if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()) != null) {
428                    throw new CommandException(ErrorCode.E1006, "coord:latest()/future()",
429                            " not permitted in output-event ");
430                }
431            }
432        }
433        return true;
434    }
435
436    /**
437     * Resolve the list of data input paths
438     *
439     * @param eDataEvents the list of data input elements
440     * @param nominalTime action nominal time
441     * @param actualTime current time
442     * @param conf action configuration
443     * @return true if all unresolved URIs can be resolved
444     * @throws Exception thrown if failed to resolve data input paths
445     */
446    @SuppressWarnings("unchecked")
447    private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime,
448            Configuration conf) throws Exception {
449        for (Element dEvent : eDataEvents) {
450            if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()) == null) {
451                continue;
452            }
453            ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf);
454            String uresolvedInstance = dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()).getTextTrim();
455            String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR);
456            StringBuffer resolvedTmp = new StringBuffer();
457            for (int i = 0; i < unresolvedList.length; i++) {
458                String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]);
459                Boolean isResolved = (Boolean) eval.getVariable("is_resolved");
460                if (isResolved == false) {
461                    LOG.info("[" + actionId + "]::Cannot resolve: " + ret);
462                    return false;
463                }
464                if (resolvedTmp.length() > 0) {
465                    resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR);
466                }
467                resolvedTmp.append((String) eval.getVariable("resolved_path"));
468            }
469            if (resolvedTmp.length() > 0) {
470                if (dEvent.getChild("uris", dEvent.getNamespace()) != null) {
471                    resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append(
472                            dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim());
473                    dEvent.removeChild("uris", dEvent.getNamespace());
474                }
475                Element uriInstance = new Element("uris", dEvent.getNamespace());
476                uriInstance.addContent(resolvedTmp.toString());
477                dEvent.getContent().add(1, uriInstance);
478            }
479            dEvent.removeChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace());
480        }
481
482        return true;
483    }
484
485    /**
486     * Check all resolved URIs existence
487     *
488     * @param eAction action element
489     * @param existList the list of existed paths
490     * @param nonExistList the list of paths to check existence
491     * @param conf action configuration
492     * @return true if all nonExistList paths exist
493     * @throws IOException thrown if unable to access the path
494     */
495    private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList,
496            Configuration conf) throws IOException {
497        Element inputList = eAction.getChild("input-events", eAction.getNamespace());
498        if (inputList != null) {
499            if (nonExistList.length() > 0) {
500                checkListOfPaths(existList, nonExistList, conf);
501            }
502            return nonExistList.length() == 0;
503        }
504        return true;
505    }
506
507    /**
508     * Check a list of non existed paths and add to exist list if it exists
509     *
510     * @param existList the list of existed paths
511     * @param nonExistList the list of paths to check existence
512     * @param conf action configuration
513     * @return true if all nonExistList paths exist
514     * @throws IOException thrown if unable to access the path
515     */
516    private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf)
517            throws IOException {
518
519        String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR);
520        if (uriList[0] != null) {
521            LOG.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing.");
522        }
523
524        nonExistList.delete(0, nonExistList.length());
525        boolean allExists = true;
526        String existSeparator = "", nonExistSeparator = "";
527        String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
528        for (int i = 0; i < uriList.length; i++) {
529            if (allExists) {
530                allExists = pathExists(uriList[i], conf, user);
531                LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists);
532            }
533            if (allExists) {
534                existList.append(existSeparator).append(uriList[i]);
535                existSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
536            }
537            else {
538                nonExistList.append(nonExistSeparator).append(uriList[i]);
539                nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
540            }
541        }
542        return allExists;
543    }
544
545    /**
546     * Check if given path exists
547     *
548     * @param sPath uri path
549     * @param actionConf action configuration
550     * @return true if path exists
551     * @throws IOException thrown if unable to access the path
552     */
553    protected boolean pathExists(String sPath, Configuration actionConf, String user) throws IOException {
554        LOG.debug("checking for the file " + sPath);
555        try {
556            URI uri = new URI(sPath);
557            URIHandlerService service = Services.get().get(URIHandlerService.class);
558            URIHandler handler = service.getURIHandler(uri);
559            return handler.exists(uri, actionConf, user);
560        }
561        catch (URIHandlerException e) {
562            coordAction.setErrorCode(e.getErrorCode().toString());
563            coordAction.setErrorMessage(e.getMessage());
564            throw new IOException(e);
565        } catch (URISyntaxException e) {
566            coordAction.setErrorCode(ErrorCode.E0906.toString());
567            coordAction.setErrorMessage(e.getMessage());
568            throw new IOException(e);
569        }
570    }
571
572    /**
573     * The function create a list of URIs separated by "," using the instances time stamp and URI-template
574     *
575     * @param event : <data-in> event
576     * @param instances : List of time stamp seprated by ","
577     * @param unresolvedInstances : list of instance with latest/future function
578     * @return : list of URIs separated by ",".
579     * @throws Exception thrown if failed to create URIs from unresolvedInstances
580     */
581    @SuppressWarnings("unused")
582    private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception {
583        if (instances == null || instances.length() == 0) {
584            return "";
585        }
586        String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR);
587        StringBuilder uris = new StringBuilder();
588
589        for (int i = 0; i < instanceList.length; i++) {
590            int funcType = CoordCommandUtils.getFuncType(instanceList[i]);
591            if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) {
592                if (unresolvedInstances.length() > 0) {
593                    unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR);
594                }
595                unresolvedInstances.append(instanceList[i]);
596                continue;
597            }
598            ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]);
599            if (uris.length() > 0) {
600                uris.append(CoordELFunctions.INSTANCE_SEPARATOR);
601            }
602            uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild(
603                    "uri-template", event.getNamespace()).getTextTrim()));
604        }
605        return uris.toString();
606    }
607
608    /**
609     * getting the error code of the coord action. (used mainly for unit testing)
610     */
611    protected String getCoordActionErrorCode() {
612        if (coordAction != null) {
613            return coordAction.getErrorCode();
614        }
615        return null;
616    }
617
618    /**
619     * getting the error message of the coord action. (used mainly for unit testing)
620     */
621    protected String getCoordActionErrorMsg() {
622        if (coordAction != null) {
623            return coordAction.getErrorMessage();
624        }
625        return null;
626    }
627
628    @Override
629    public String getEntityKey() {
630        return this.jobId;
631    }
632
633    @Override
634    protected boolean isLockRequired() {
635        return true;
636    }
637
638    @Override
639    protected void loadState() throws CommandException {
640        if (jpaService == null) {
641            jpaService = Services.get().get(JPAService.class);
642        }
643        try {
644            coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
645            coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_INPUT_CHECK,
646                    coordAction.getJobId());
647        }
648        catch (JPAExecutorException je) {
649            throw new CommandException(je);
650        }
651        LogUtils.setLogInfo(coordAction, logInfo);
652    }
653
654    @Override
655    protected void verifyPrecondition() throws CommandException, PreconditionException {
656        if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
657            throw new PreconditionException(ErrorCode.E1100, "[" + actionId
658                    + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state="
659                    + coordAction.getStatus());
660        }
661
662        // if eligible to do action input check when running with backward support is true
663        if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
664            return;
665        }
666
667        if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR && coordJob.getStatus() != Job.Status.PAUSED
668                && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
669            throw new PreconditionException(
670                    ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." +
671                                " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
672                            + coordJob.getStatus());
673        }
674    }
675
676    @Override
677    public String getKey(){
678        return getName() + "_" + actionId;
679    }
680
681}