001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.coord;
020
021import java.io.IOException;
022import java.io.StringReader;
023import java.util.Calendar;
024import java.util.Date;
025import java.util.List;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.security.AccessControlException;
028import org.apache.oozie.CoordinatorActionBean;
029import org.apache.oozie.CoordinatorJobBean;
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.client.CoordinatorAction;
032import org.apache.oozie.client.Job;
033import org.apache.oozie.command.CommandException;
034import org.apache.oozie.command.PreconditionException;
035import org.apache.oozie.coord.CoordELEvaluator;
036import org.apache.oozie.coord.CoordELFunctions;
037import org.apache.oozie.coord.input.dependency.CoordInputDependency;
038import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
039import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
040import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
041import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
042import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
043import org.apache.oozie.executor.jpa.JPAExecutorException;
044import org.apache.oozie.service.CallableQueueService;
045import org.apache.oozie.service.ConfigurationService;
046import org.apache.oozie.service.EventHandlerService;
047import org.apache.oozie.service.JPAService;
048import org.apache.oozie.service.Service;
049import org.apache.oozie.service.Services;
050import org.apache.oozie.util.DateUtils;
051import org.apache.oozie.util.ELEvaluator;
052import org.apache.oozie.util.LogUtils;
053import org.apache.oozie.util.ParamChecker;
054import org.apache.oozie.util.StatusUtils;
055import org.apache.oozie.util.XConfiguration;
056import org.apache.oozie.util.XLog;
057import org.apache.oozie.util.XmlUtils;
058import org.jdom.Element;
059
060/**
061 * The command to check if an action's data input paths exist in the file system.
062 */
063public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
064
065    public static final String COORD_EXECUTION_NONE_TOLERANCE = "oozie.coord.execution.none.tolerance";
066
067    private final String actionId;
068    /**
069     * Property name of command re-queue interval for coordinator action input check in
070     * milliseconds.
071     */
072    public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX
073            + "coord.input.check.requeue.interval";
074    public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL_ADDITIONAL_DELAY = Service.CONF_PREFIX
075            + "coord.input.check.requeue.interval.additional.delay";
076    private CoordinatorActionBean coordAction = null;
077    private CoordinatorJobBean coordJob = null;
078    private JPAService jpaService = null;
079    private String jobId = null;
080    public CoordActionInputCheckXCommand(String actionId, String jobId) {
081        super("coord_action_input", "coord_action_input", 1);
082        this.actionId = ParamChecker.notEmpty(actionId, "actionId");
083        this.jobId = jobId;
084    }
085
086    @Override
087    protected void setLogInfo() {
088        LogUtils.setLogInfo(actionId);
089    }
090
091    @Override
092    protected Void execute() throws CommandException {
093        LOG.debug("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state.");
094
095        // this action should only get processed if current time > nominal time;
096        // otherwise, requeue this action for delay execution;
097        Date nominalTime = coordAction.getNominalTime();
098        Date currentTime = new Date();
099        if (nominalTime.compareTo(currentTime) > 0) {
100            queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), nominalTime.getTime()
101                    - currentTime.getTime());
102            updateCoordAction(coordAction, false);
103            LOG.info("[" + actionId
104                    + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current="
105                    + DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime));
106
107            return null;
108        }
109
110        StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
111        boolean isChangeInDependency = false;
112        try {
113            Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
114            Date now = new Date();
115            if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.LAST_ONLY)) {
116                Date nextNominalTime = CoordCommandUtils.computeNextNominalTime(coordJob, coordAction);
117                if (nextNominalTime != null) {
118                    // If the current time is after the next action's nominal time, then we've passed the window where this action
119                    // should be started; so set it to SKIPPED
120                    if (now.after(nextNominalTime)) {
121                        LOG.info("LAST_ONLY execution: Preparing to skip action [{0}] because the current time [{1}] is later than "
122                                + "the nominal time [{2}] of the next action]", coordAction.getId(),
123                                DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
124                        queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
125                        return null;
126                    } else {
127                        LOG.debug("LAST_ONLY execution: Not skipping action [{0}] because the current time [{1}] is earlier than "
128                                + "the nominal time [{2}] of the next action]", coordAction.getId(),
129                                DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
130                    }
131                }
132            }
133            else if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.NONE)) {
134                // If the current time is after the nominal time of this action plus some tolerance,
135                // then we've passed the window where this action should be started; so set it to SKIPPED
136                Calendar cal = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
137                cal.setTime(nominalTime);
138                int tolerance = ConfigurationService.getInt(COORD_EXECUTION_NONE_TOLERANCE);
139                cal.add(Calendar.MINUTE, tolerance);
140                if (now.after(cal.getTime())) {
141                    LOG.info("NONE execution: Preparing to skip action [{0}] because the current time [{1}] is more than [{2}]"
142                            + " minutes later than the nominal time [{3}] of the current action]", coordAction.getId(),
143                            DateUtils.formatDateOozieTZ(now), tolerance, DateUtils.formatDateOozieTZ(nominalTime));
144                    queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
145                    return null;
146                } else {
147                    LOG.debug("NONE execution: Not skipping action [{0}] because the current time [{1}] is earlier than [{2}]"
148                            + " minutes later than the nominal time [{3}] of the current action]", coordAction.getId(),
149                            DateUtils.formatDateOozieTZ(now), tolerance, DateUtils.formatDateOozieTZ(coordAction.getNominalTime()));
150                }
151            }
152
153            StringBuilder existList = new StringBuilder();
154            StringBuilder nonExistList = new StringBuilder();
155            CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies();
156            CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies();
157
158
159            String missingDependencies = coordPullInputDependency.getMissingDependencies();
160            StringBuilder nonResolvedList = new StringBuilder();
161
162            CoordCommandUtils.getResolvedList(missingDependencies, nonExistList, nonResolvedList);
163            String firstMissingDependency = "";
164            // For clarity regarding which is the missing dependency in synchronous order
165            // instead of printing entire list, some of which, may be available
166            if (nonExistList.length() > 0) {
167                firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0];
168            }
169            LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " "
170                    + nonResolvedList.toString());
171
172
173            boolean status = checkResolvedInput(actionXml, existList, nonExistList, actionConf);
174            boolean isPushDependenciesMet = coordPushInputDependency.isDependencyMet();
175            if (status && nonResolvedList.length() > 0) {
176                status = (isPushDependenciesMet) ? checkUnResolvedInput(actionXml, actionConf) : false;
177            }
178            coordAction.setLastModifiedTime(currentTime);
179            coordAction.setActionXml(actionXml.toString());
180
181            isChangeInDependency = isChangeInDependency(nonExistList, missingDependencies, nonResolvedList, status);
182
183            if (status && isPushDependenciesMet) {
184                String newActionXml = resolveCoordConfiguration(actionXml, actionConf, actionId,
185                        coordPullInputDependency, coordPushInputDependency);
186                actionXml.replace(0, actionXml.length(), newActionXml);
187                coordAction.setActionXml(actionXml.toString());
188                coordAction.setStatus(CoordinatorAction.Status.READY);
189                updateCoordAction(coordAction, true);
190                new CoordActionReadyXCommand(coordAction.getJobId()).call();
191            }
192            else if (!isTimeout(currentTime)) {
193                if (!status) {
194                    long addtionalDelay = isChangeInDependency ? 0
195                            : ConfigurationService.getInt(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL_ADDITIONAL_DELAY)
196                                    * 1000L;
197                    queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
198                            addtionalDelay + getCoordInputCheckRequeueInterval());
199                }
200                updateCoordAction(coordAction, isChangeInDependency);
201            }
202            else {
203                if (isPushDependenciesMet) {
204                    queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
205                }
206                else {
207                    // Let CoordPushDependencyCheckXCommand queue the timeout
208                    queue(new CoordPushDependencyCheckXCommand(coordAction.getId()));
209                }
210                updateCoordAction(coordAction, isChangeInDependency);
211            }
212        }
213        catch (AccessControlException e) {
214            LOG.error("Permission error in ActionInputCheck", e);
215            if (isTimeout(currentTime)) {
216                LOG.debug("Queueing timeout command");
217                Services.get().get(CallableQueueService.class)
218                        .queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
219            }
220            else {
221                // Requeue InputCheckCommand for permission denied error with longer interval
222                Services.get()
223                        .get(CallableQueueService.class)
224                        .queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
225                                2 * getCoordInputCheckRequeueInterval());
226            }
227            updateCoordAction(coordAction, isChangeInDependency);
228        }
229        catch (Exception e) {
230            if (isTimeout(currentTime)) {
231                LOG.debug("Queueing timeout command");
232                // XCommand.queue() will not work when there is a Exception
233                Services.get().get(CallableQueueService.class)
234                        .queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
235            }
236            updateCoordAction(coordAction, isChangeInDependency);
237            throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
238        }
239        return null;
240    }
241
242    private boolean isChangeInDependency(StringBuilder nonExistList, String missingDependencies,
243            StringBuilder nonResolvedList, boolean status) throws IOException {
244        if (nonResolvedList.length() > 0 && status == false) {
245            nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
246        }
247        return coordAction.getPullInputDependencies().isChangeInDependency(nonExistList, missingDependencies,
248                nonResolvedList, status);
249    }
250
251    static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId)
252            throws Exception {
253        return resolveCoordConfiguration(actionXml, actionConf, actionId, null, null);
254    }
255
256    static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId,
257            CoordInputDependency pullDependencies, CoordInputDependency pushDependencies) throws Exception {
258        Element eAction = XmlUtils.parseXml(actionXml.toString());
259        ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, actionConf, actionId, pullDependencies,
260                pushDependencies);
261        materializeDataProperties(eAction, actionConf, eval);
262        return XmlUtils.prettyPrint(eAction).toString();
263    }
264
265    private boolean isTimeout(Date currentTime) {
266        long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
267                .getCreatedTime().getTime()))
268                / (60 * 1000);
269        int timeOut = coordAction.getTimeOut();
270        return (timeOut >= 0) && (waitingTime > timeOut);
271    }
272
273    private void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency)
274            throws CommandException {
275        coordAction.setLastModifiedTime(new Date());
276        if (jpaService != null) {
277            try {
278                if (isChangeInDependency) {
279                    coordAction.setMissingDependencies(coordAction.getPullInputDependencies().serialize());
280                    CoordActionQueryExecutor.getInstance().executeUpdate(
281                            CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK, coordAction);
282                    if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) {
283                        // since event is not to be generated unless action
284                        // RUNNING via StartX
285                        generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
286                    }
287                }
288                else {
289                    CoordActionQueryExecutor.getInstance().executeUpdate(
290                            CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction);
291                }
292            }
293            catch (Exception jex) {
294                throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
295            }
296        }
297    }
298    /**
299     * This function reads the value of re-queue interval for coordinator input
300     * check command from the Oozie configuration provided by Configuration
301     * Service. If nothing defined in the configuration, it uses the code
302     * specified default value.
303     *
304     * @return re-queue interval in ms
305     */
306    public long getCoordInputCheckRequeueInterval() {
307        long requeueInterval = ConfigurationService.getLong(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL);
308        return requeueInterval;
309    }
310
311    /**
312     * To check the list of input paths if all of them exist
313     *
314     * @param actionXml action xml
315     * @param existList the list of existed paths
316     * @param nonExistList the list of non existed paths
317     * @param conf action configuration
318     * @return true if all input paths are existed
319     * @throws Exception thrown of unable to check input path
320     */
321    protected boolean checkResolvedInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
322            Configuration conf) throws Exception {
323        return coordAction.getPullInputDependencies().checkPullMissingDependencies(coordAction, existList,
324                nonExistList);
325    }
326
327    /**
328     * Check un resolved input.
329     *
330     * @param coordAction the coord action
331     * @param actionXml the action xml
332     * @param conf the conf
333     * @return true, if successful
334     * @throws Exception the exception
335     */
336    protected boolean checkUnResolvedInput(CoordinatorActionBean coordAction, StringBuilder actionXml,
337            Configuration conf) throws Exception {
338        Element eAction = XmlUtils.parseXml(actionXml.toString());
339        LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
340        boolean allExist = checkUnresolvedInstances(coordAction, eAction, conf);
341        if (allExist) {
342            actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString());
343        }
344        return allExist;
345    }
346
347    /**
348     * Check un resolved input.
349     *
350     * @param actionXml the action xml
351     * @param conf the conf
352     * @return true, if successful
353     * @throws Exception the exception
354     */
355    protected boolean checkUnResolvedInput(StringBuilder actionXml, Configuration conf) throws Exception {
356        return checkUnResolvedInput(coordAction, actionXml, conf);
357    }
358
359    /**
360     * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list
361     * of files that will be needed.
362     *
363     * @param eAction action element
364     * @param conf action configuration
365     * @throws Exception thrown if failed to resolve data properties
366     * @update modify 'Action' element with appropriate list of files.
367     */
368    @SuppressWarnings("unchecked")
369    static void materializeDataProperties(Element eAction, Configuration conf, ELEvaluator eval) throws Exception {
370        Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow",
371                eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
372        if (configElem != null) {
373            for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
374                resolveTagContents("value", propElem, eval);
375            }
376        }
377    }
378
379    /**
380     * To resolve property value which contains el functions
381     *
382     * @param tagName tag name
383     * @param elem the child element of "property" element
384     * @param eval el functions evaluator
385     * @throws Exception thrown if unable to resolve tag value
386     */
387    private static void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception {
388        if (elem == null) {
389            return;
390        }
391        Element tagElem = elem.getChild(tagName, elem.getNamespace());
392        if (tagElem != null) {
393            String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText());
394            tagElem.removeContent();
395            tagElem.addContent(updated);
396        }
397        else {
398            XLog.getLog(CoordActionInputCheckXCommand.class).warn(" Value NOT FOUND " + tagName);
399        }
400    }
401
402    /**
403     * Check if any unsolved paths under data output. Resolve the unresolved data input paths.
404     *
405     * @param eAction action element
406     * @param actionConf action configuration
407     * @return true if successful to resolve input and output paths
408     * @throws Exception thrown if failed to resolve data input and output paths
409     */
410    @SuppressWarnings("unchecked")
411    private boolean checkUnresolvedInstances(CoordinatorActionBean coordAction, Element eAction,
412            Configuration actionConf) throws Exception {
413
414        boolean ret = coordAction.getPullInputDependencies().checkUnresolved(coordAction, eAction);
415
416        // Using latest() or future() in output-event is not intuitive.
417        // We need to make sure, this assumption is correct.
418        Element outputList = eAction.getChild("output-events", eAction.getNamespace());
419        if (outputList != null) {
420            for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
421                if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()) != null) {
422                    throw new CommandException(ErrorCode.E1006, "coord:latest()/future()",
423                            " not permitted in output-event ");
424                }
425            }
426        }
427        return ret;
428    }
429
430    /**
431     * getting the error code of the coord action. (used mainly for unit testing)
432     */
433    protected String getCoordActionErrorCode() {
434        if (coordAction != null) {
435            return coordAction.getErrorCode();
436        }
437        return null;
438    }
439
440    /**
441     * getting the error message of the coord action. (used mainly for unit testing)
442     */
443    protected String getCoordActionErrorMsg() {
444        if (coordAction != null) {
445            return coordAction.getErrorMessage();
446        }
447        return null;
448    }
449
450    @Override
451    public String getEntityKey() {
452        return this.jobId;
453    }
454
455    @Override
456    protected boolean isLockRequired() {
457        return true;
458    }
459
460    @Override
461    protected void loadState() throws CommandException {
462        if (jpaService == null) {
463            jpaService = Services.get().get(JPAService.class);
464        }
465        try {
466            coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
467            coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_INPUT_CHECK,
468                    coordAction.getJobId());
469        }
470        catch (JPAExecutorException je) {
471            throw new CommandException(je);
472        }
473        LogUtils.setLogInfo(coordAction);
474    }
475
476    @Override
477    protected void verifyPrecondition() throws CommandException, PreconditionException {
478        if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
479            throw new PreconditionException(ErrorCode.E1100, "[" + actionId
480                    + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state="
481                    + coordAction.getStatus());
482        }
483
484        // if eligible to do action input check when running with backward support is true
485        if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
486            return;
487        }
488
489        if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR && coordJob.getStatus() != Job.Status.PAUSED
490                && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
491            throw new PreconditionException(
492                    ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." +
493                                " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
494                            + coordJob.getStatus());
495        }
496    }
497
498    @Override
499    public String getKey(){
500        return getName() + "_" + actionId;
501    }
502
503}