001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.net.URI;
023    import java.net.URISyntaxException;
024    import java.util.Date;
025    import java.util.List;
026    
027    import org.apache.hadoop.conf.Configuration;
028    import org.apache.oozie.CoordinatorActionBean;
029    import org.apache.oozie.CoordinatorJobBean;
030    import org.apache.oozie.ErrorCode;
031    import org.apache.oozie.client.CoordinatorAction;
032    import org.apache.oozie.client.Job;
033    import org.apache.oozie.client.OozieClient;
034    import org.apache.oozie.command.CommandException;
035    import org.apache.oozie.command.PreconditionException;
036    import org.apache.oozie.coord.CoordELEvaluator;
037    import org.apache.oozie.coord.CoordELFunctions;
038    import org.apache.oozie.dependency.URIHandler;
039    import org.apache.oozie.dependency.URIHandlerException;
040    import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
041    import org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor;
042    import org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor;
043    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
044    import org.apache.oozie.executor.jpa.JPAExecutorException;
045    import org.apache.oozie.service.CallableQueueService;
046    import org.apache.oozie.service.EventHandlerService;
047    import org.apache.oozie.service.JPAService;
048    import org.apache.oozie.service.Service;
049    import org.apache.oozie.service.Services;
050    import org.apache.oozie.service.URIHandlerService;
051    import org.apache.oozie.util.DateUtils;
052    import org.apache.oozie.util.ELEvaluator;
053    import org.apache.oozie.util.Instrumentation;
054    import org.apache.oozie.util.LogUtils;
055    import org.apache.oozie.util.ParamChecker;
056    import org.apache.oozie.util.StatusUtils;
057    import org.apache.oozie.util.XConfiguration;
058    import org.apache.oozie.util.XLog;
059    import org.apache.oozie.util.XmlUtils;
060    import org.jdom.Element;
061    
062    /**
063     * The command to check if an action's data input paths exist in the file system.
064     */
065    public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
066    
067        private final String actionId;
068        /**
069         * Property name of command re-queue interval for coordinator action input check in
070         * milliseconds.
071         */
072        public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX
073                + "coord.input.check.requeue.interval";
074        /**
075         * Default re-queue interval in ms. It is applied when no value defined in
076         * the oozie configuration.
077         */
078        private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 60000; // 1 minute
079        private CoordinatorActionBean coordAction = null;
080        private CoordinatorJobBean coordJob = null;
081        private JPAService jpaService = null;
082        private String jobId = null;
083    
084        public CoordActionInputCheckXCommand(String actionId, String jobId) {
085            super("coord_action_input", "coord_action_input", 1);
086            this.actionId = ParamChecker.notEmpty(actionId, "actionId");
087            this.jobId = jobId;
088        }
089    
090        @Override
091        protected Void execute() throws CommandException {
092            LOG.info("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state.");
093    
094            // this action should only get processed if current time > nominal time;
095            // otherwise, requeue this action for delay execution;
096            Date nominalTime = coordAction.getNominalTime();
097            Date currentTime = new Date();
098            if (nominalTime.compareTo(currentTime) > 0) {
099                queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), Math.max((nominalTime.getTime() - currentTime
100                        .getTime()), getCoordInputCheckRequeueInterval()));
101                updateCoordAction(coordAction, false);
102                LOG.info("[" + actionId
103                        + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current="
104                        + currentTime + ", nominal=" + nominalTime);
105    
106                return null;
107            }
108    
109            StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
110            Instrumentation.Cron cron = new Instrumentation.Cron();
111            boolean isChangeInDependency = false;
112            try {
113                Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
114                cron.start();
115                StringBuilder existList = new StringBuilder();
116                StringBuilder nonExistList = new StringBuilder();
117                StringBuilder nonResolvedList = new StringBuilder();
118                String firstMissingDependency = "";
119                String missingDeps = coordAction.getMissingDependencies();
120                CoordCommandUtils.getResolvedList(missingDeps, nonExistList, nonResolvedList);
121    
122                // For clarity regarding which is the missing dependency in synchronous order
123                // instead of printing entire list, some of which, may be available
124                if(nonExistList.length() > 0) {
125                    firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0];
126                }
127                LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " "
128                        + nonResolvedList.toString());
129                // Updating the list of data dependencies that are available and those that are yet not
130                boolean status = checkInput(actionXml, existList, nonExistList, actionConf);
131                String pushDeps = coordAction.getPushMissingDependencies();
132                // Resolve latest/future only when all current missingDependencies and
133                // pushMissingDependencies are met
134                if (status && nonResolvedList.length() > 0) {
135                    status = (pushDeps == null || pushDeps.length() == 0) ? checkUnResolvedInput(actionXml, actionConf)
136                            : false;
137                }
138                coordAction.setLastModifiedTime(currentTime);
139                coordAction.setActionXml(actionXml.toString());
140                if (nonResolvedList.length() > 0 && status == false) {
141                    nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
142                }
143                String nonExistListStr = nonExistList.toString();
144                if (!nonExistListStr.equals(missingDeps) || missingDeps.isEmpty()) {
145                    // missingDeps null or empty means action should become READY
146                    isChangeInDependency = true;
147                    coordAction.setMissingDependencies(nonExistListStr);
148                }
149                if (status && (pushDeps == null || pushDeps.length() == 0)) {
150                    String newActionXml = resolveCoordConfiguration(actionXml, actionConf, actionId);
151                    actionXml.replace(0, actionXml.length(), newActionXml);
152                    coordAction.setActionXml(actionXml.toString());
153                    coordAction.setStatus(CoordinatorAction.Status.READY);
154                    // pass jobID to the CoordActionReadyXCommand
155                    queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
156                }
157                else if (!isTimeout(currentTime)) {
158                    if (status == false) {
159                        queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
160                                getCoordInputCheckRequeueInterval());
161                    }
162                }
163                else {
164                    if (!nonExistListStr.isEmpty() && pushDeps == null || pushDeps.length() == 0) {
165                        queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
166                    }
167                    else {
168                        // Let CoordPushDependencyCheckXCommand queue the timeout
169                        queue(new CoordPushDependencyCheckXCommand(coordAction.getId()));
170                    }
171                }
172            }
173            catch (Exception e) {
174                if (isTimeout(currentTime)) {
175                    LOG.debug("Queueing timeout command");
176                    // XCommand.queue() will not work when there is a Exception
177                    Services.get().get(CallableQueueService.class)
178                            .queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
179                }
180                throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
181            }
182            finally {
183                cron.stop();
184                updateCoordAction(coordAction, isChangeInDependency);
185            }
186            return null;
187        }
188    
189    
190        static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId) throws Exception {
191            Element eAction = XmlUtils.parseXml(actionXml.toString());
192            ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, actionConf, actionId);
193            materializeDataProperties(eAction, actionConf, eval);
194            return XmlUtils.prettyPrint(eAction).toString();
195        }
196    
197        private boolean isTimeout(Date currentTime) {
198            long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
199                    .getCreatedTime().getTime()))
200                    / (60 * 1000);
201            int timeOut = coordAction.getTimeOut();
202            return (timeOut >= 0) && (waitingTime > timeOut);
203        }
204    
205        private void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency)
206                throws CommandException {
207            coordAction.setLastModifiedTime(new Date());
208            if (jpaService != null) {
209                try {
210                    if (isChangeInDependency) {
211                        jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(coordAction));
212                        if (EventHandlerService.isEnabled()
213                                && coordAction.getStatus() != CoordinatorAction.Status.READY) {
214                            //since event is not to be generated unless action RUNNING via StartX
215                            generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
216                        }
217                    }
218                    else {
219                        jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction));
220                    }
221                }
222                catch (JPAExecutorException jex) {
223                    throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
224                }
225            }
226        }
227    
228        /**
229         * This function reads the value of re-queue interval for coordinator input
230         * check command from the Oozie configuration provided by Configuration
231         * Service. If nothing defined in the configuration, it uses the code
232         * specified default value.
233         *
234         * @return re-queue interval in ms
235         */
236        public long getCoordInputCheckRequeueInterval() {
237            long requeueInterval = Services.get().getConf().getLong(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL,
238                    DEFAULT_COMMAND_REQUEUE_INTERVAL);
239            return requeueInterval;
240        }
241    
242        /**
243         * To check the list of input paths if all of them exist
244         *
245         * @param actionXml action xml
246         * @param existList the list of existed paths
247         * @param nonExistList the list of non existed paths
248         * @param conf action configuration
249         * @return true if all input paths are existed
250         * @throws Exception thrown of unable to check input path
251         */
252        protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
253                Configuration conf) throws Exception {
254            Element eAction = XmlUtils.parseXml(actionXml.toString());
255            return checkResolvedUris(eAction, existList, nonExistList, conf);
256        }
257    
258        protected boolean checkUnResolvedInput(StringBuilder actionXml, Configuration conf) throws Exception {
259            Element eAction = XmlUtils.parseXml(actionXml.toString());
260            LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
261            boolean allExist = checkUnresolvedInstances(eAction, conf);
262            if (allExist) {
263                actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString());
264            }
265            return allExist;
266        }
267    
268        /**
269         * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list
270         * of files that will be needed.
271         *
272         * @param eAction action element
273         * @param conf action configuration
274         * @throws Exception thrown if failed to resolve data properties
275         * @update modify 'Action' element with appropriate list of files.
276         */
277        @SuppressWarnings("unchecked")
278        static void materializeDataProperties(Element eAction, Configuration conf, ELEvaluator eval) throws Exception {
279            Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow",
280                    eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
281            if (configElem != null) {
282                for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
283                    resolveTagContents("value", propElem, eval);
284                }
285            }
286        }
287    
288        /**
289         * To resolve property value which contains el functions
290         *
291         * @param tagName tag name
292         * @param elem the child element of "property" element
293         * @param eval el functions evaluator
294         * @throws Exception thrown if unable to resolve tag value
295         */
296        private static void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception {
297            if (elem == null) {
298                return;
299            }
300            Element tagElem = elem.getChild(tagName, elem.getNamespace());
301            if (tagElem != null) {
302                String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText());
303                tagElem.removeContent();
304                tagElem.addContent(updated);
305            }
306            else {
307                XLog.getLog(CoordActionInputCheckXCommand.class).warn(" Value NOT FOUND " + tagName);
308            }
309        }
310    
311        /**
312         * Check if any unsolved paths under data output. Resolve the unresolved data input paths.
313         *
314         * @param eAction action element
315         * @param actionConf action configuration
316         * @return true if successful to resolve input and output paths
317         * @throws Exception thrown if failed to resolve data input and output paths
318         */
319        @SuppressWarnings("unchecked")
320        private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf) throws Exception {
321            String strAction = XmlUtils.prettyPrint(eAction).toString();
322            Date nominalTime = DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time"));
323            String actualTimeStr = eAction.getAttributeValue("action-actual-time");
324            Date actualTime = null;
325            if (actualTimeStr == null) {
326                LOG.debug("Unable to get action-actual-time from action xml, this job is submitted " +
327                "from previous version. Assign current date to actual time, action = " + actionId);
328                actualTime = new Date();
329            } else {
330                actualTime = DateUtils.parseDateOozieTZ(actualTimeStr);
331            }
332    
333            StringBuffer resultedXml = new StringBuffer();
334    
335            boolean ret;
336            Element inputList = eAction.getChild("input-events", eAction.getNamespace());
337            if (inputList != null) {
338                ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime,
339                        actualTime, actionConf);
340                if (ret == false) {
341                    resultedXml.append(strAction);
342                    return false;
343                }
344            }
345    
346            // Using latest() or future() in output-event is not intuitive.
347            // We need to make sure, this assumption is correct.
348            Element outputList = eAction.getChild("output-events", eAction.getNamespace());
349            if (outputList != null) {
350                for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
351                    if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()) != null) {
352                        throw new CommandException(ErrorCode.E1006, "coord:latest()/future()",
353                                " not permitted in output-event ");
354                    }
355                }
356            }
357            return true;
358        }
359    
360        /**
361         * Resolve the list of data input paths
362         *
363         * @param eDataEvents the list of data input elements
364         * @param nominalTime action nominal time
365         * @param actualTime current time
366         * @param conf action configuration
367         * @return true if all unresolved URIs can be resolved
368         * @throws Exception thrown if failed to resolve data input paths
369         */
370        @SuppressWarnings("unchecked")
371        private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime,
372                Configuration conf) throws Exception {
373            for (Element dEvent : eDataEvents) {
374                if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()) == null) {
375                    continue;
376                }
377                ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf);
378                String uresolvedInstance = dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()).getTextTrim();
379                String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR);
380                StringBuffer resolvedTmp = new StringBuffer();
381                for (int i = 0; i < unresolvedList.length; i++) {
382                    String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]);
383                    Boolean isResolved = (Boolean) eval.getVariable("is_resolved");
384                    if (isResolved == false) {
385                        LOG.info("[" + actionId + "]::Cannot resolve: " + ret);
386                        return false;
387                    }
388                    if (resolvedTmp.length() > 0) {
389                        resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR);
390                    }
391                    resolvedTmp.append((String) eval.getVariable("resolved_path"));
392                }
393                if (resolvedTmp.length() > 0) {
394                    if (dEvent.getChild("uris", dEvent.getNamespace()) != null) {
395                        resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append(
396                                dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim());
397                        dEvent.removeChild("uris", dEvent.getNamespace());
398                    }
399                    Element uriInstance = new Element("uris", dEvent.getNamespace());
400                    uriInstance.addContent(resolvedTmp.toString());
401                    dEvent.getContent().add(1, uriInstance);
402                }
403                dEvent.removeChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace());
404            }
405    
406            return true;
407        }
408    
409        /**
410         * Check all resolved URIs existence
411         *
412         * @param eAction action element
413         * @param existList the list of existed paths
414         * @param nonExistList the list of paths to check existence
415         * @param conf action configuration
416         * @return true if all nonExistList paths exist
417         * @throws IOException thrown if unable to access the path
418         */
419        private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList,
420                Configuration conf) throws IOException {
421            LOG.info("[" + actionId + "]::ActionInputCheck:: In checkResolvedUris...");
422            Element inputList = eAction.getChild("input-events", eAction.getNamespace());
423            if (inputList != null) {
424                if (nonExistList.length() > 0) {
425                    checkListOfPaths(existList, nonExistList, conf);
426                }
427                return nonExistList.length() == 0;
428            }
429            return true;
430        }
431    
432        /**
433         * Check a list of non existed paths and add to exist list if it exists
434         *
435         * @param existList the list of existed paths
436         * @param nonExistList the list of paths to check existence
437         * @param conf action configuration
438         * @return true if all nonExistList paths exist
439         * @throws IOException thrown if unable to access the path
440         */
441        private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf)
442                throws IOException {
443    
444            String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR);
445            if (uriList[0] != null) {
446                LOG.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing.");
447            }
448    
449            nonExistList.delete(0, nonExistList.length());
450            boolean allExists = true;
451            String existSeparator = "", nonExistSeparator = "";
452            String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
453            for (int i = 0; i < uriList.length; i++) {
454                if (allExists) {
455                    allExists = pathExists(uriList[i], conf, user);
456                    LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists);
457                }
458                if (allExists) {
459                    existList.append(existSeparator).append(uriList[i]);
460                    existSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
461                }
462                else {
463                    nonExistList.append(nonExistSeparator).append(uriList[i]);
464                    nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
465                }
466            }
467            return allExists;
468        }
469    
470        /**
471         * Check if given path exists
472         *
473         * @param sPath uri path
474         * @param actionConf action configuration
475         * @return true if path exists
476         * @throws IOException thrown if unable to access the path
477         */
478        protected boolean pathExists(String sPath, Configuration actionConf, String user) throws IOException {
479            LOG.debug("checking for the file " + sPath);
480            try {
481                URI uri = new URI(sPath);
482                URIHandlerService service = Services.get().get(URIHandlerService.class);
483                URIHandler handler = service.getURIHandler(uri);
484                return handler.exists(uri, actionConf, user);
485            }
486            catch (URIHandlerException e) {
487                coordAction.setErrorCode(e.getErrorCode().toString());
488                coordAction.setErrorMessage(e.getMessage());
489                throw new IOException(e);
490            } catch (URISyntaxException e) {
491                coordAction.setErrorCode(ErrorCode.E0906.toString());
492                coordAction.setErrorMessage(e.getMessage());
493                throw new IOException(e);
494            }
495        }
496    
497        /**
498         * The function create a list of URIs separated by "," using the instances time stamp and URI-template
499         *
500         * @param event : <data-in> event
501         * @param instances : List of time stamp seprated by ","
502         * @param unresolvedInstances : list of instance with latest/future function
503         * @return : list of URIs separated by ",".
504         * @throws Exception thrown if failed to create URIs from unresolvedInstances
505         */
506        @SuppressWarnings("unused")
507        private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception {
508            if (instances == null || instances.length() == 0) {
509                return "";
510            }
511            String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR);
512            StringBuilder uris = new StringBuilder();
513    
514            for (int i = 0; i < instanceList.length; i++) {
515                int funcType = CoordCommandUtils.getFuncType(instanceList[i]);
516                if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) {
517                    if (unresolvedInstances.length() > 0) {
518                        unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR);
519                    }
520                    unresolvedInstances.append(instanceList[i]);
521                    continue;
522                }
523                ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]);
524                if (uris.length() > 0) {
525                    uris.append(CoordELFunctions.INSTANCE_SEPARATOR);
526                }
527                uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild(
528                        "uri-template", event.getNamespace()).getTextTrim()));
529            }
530            return uris.toString();
531        }
532    
533        /**
534         * getting the error code of the coord action. (used mainly for unit testing)
535         */
536        protected String getCoordActionErrorCode() {
537            if (coordAction != null) {
538                return coordAction.getErrorCode();
539            }
540            return null;
541        }
542    
543        /**
544         * getting the error message of the coord action. (used mainly for unit testing)
545         */
546        protected String getCoordActionErrorMsg() {
547            if (coordAction != null) {
548                return coordAction.getErrorMessage();
549            }
550            return null;
551        }
552    
553        @Override
554        public String getEntityKey() {
555            return this.jobId;
556        }
557    
558        @Override
559        protected boolean isLockRequired() {
560            return true;
561        }
562    
563        @Override
564        protected void loadState() throws CommandException {
565            if (jpaService == null) {
566                jpaService = Services.get().get(JPAService.class);
567            }
568            try {
569                coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
570                coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
571            }
572            catch (JPAExecutorException je) {
573                throw new CommandException(je);
574            }
575            LogUtils.setLogInfo(coordAction, logInfo);
576        }
577    
578        @Override
579        protected void verifyPrecondition() throws CommandException, PreconditionException {
580            if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
581                throw new PreconditionException(ErrorCode.E1100, "[" + actionId
582                        + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state="
583                        + coordAction.getStatus());
584            }
585    
586            // if eligible to do action input check when running with backward support is true
587            if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
588                return;
589            }
590    
591            if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR && coordJob.getStatus() != Job.Status.PAUSED
592                    && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
593                throw new PreconditionException(
594                        ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." +
595                                    " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
596                                + coordJob.getStatus());
597            }
598        }
599    
600        @Override
601        public String getKey(){
602            return getName() + "_" + actionId;
603        }
604    
605    }