001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.coord;
020
021import java.io.IOException;
022import java.io.StringReader;
023import java.net.URI;
024import java.util.Date;
025import java.util.List;
026
027import org.apache.commons.lang.StringUtils;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.oozie.CoordinatorActionBean;
030import org.apache.oozie.CoordinatorJobBean;
031import org.apache.oozie.ErrorCode;
032import org.apache.oozie.client.CoordinatorAction;
033import org.apache.oozie.client.Job;
034import org.apache.oozie.client.OozieClient;
035import org.apache.oozie.command.CommandException;
036import org.apache.oozie.command.PreconditionException;
037import org.apache.oozie.coord.input.dependency.CoordInputDependency;
038import org.apache.oozie.dependency.ActionDependency;
039import org.apache.oozie.dependency.DependencyChecker;
040import org.apache.oozie.dependency.URIHandler;
041import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
042import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
043import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
044import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
045import org.apache.oozie.executor.jpa.JPAExecutorException;
046import org.apache.oozie.service.CallableQueueService;
047import org.apache.oozie.service.ConfigurationService;
048import org.apache.oozie.service.EventHandlerService;
049import org.apache.oozie.service.JPAService;
050import org.apache.oozie.service.PartitionDependencyManagerService;
051import org.apache.oozie.service.RecoveryService;
052import org.apache.oozie.service.Service;
053import org.apache.oozie.service.Services;
054import org.apache.oozie.service.URIHandlerService;
055import org.apache.oozie.util.LogUtils;
056import org.apache.oozie.util.StatusUtils;
057import org.apache.oozie.util.XConfiguration;
058import org.apache.oozie.util.XLog;
059import org.apache.oozie.util.DateUtils;
060
061public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> {
062    protected String actionId;
063    protected JPAService jpaService = null;
064    protected CoordinatorActionBean coordAction = null;
065    protected CoordinatorJobBean coordJob = null;
066
067    /**
068     * Property name of command re-queue interval for coordinator push check in
069     * milliseconds.
070     */
071    public static final String CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX
072            + "coord.push.check.requeue.interval";
073    private boolean registerForNotification;
074    private boolean removeAvailDependencies;
075
076    public CoordPushDependencyCheckXCommand(String actionId) {
077        this(actionId, false, true);
078    }
079
080    public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification) {
081        this(actionId, registerForNotification, !registerForNotification);
082    }
083
084    public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification,
085            boolean removeAvailDependencies) {
086        super("coord_push_dep_check", "coord_push_dep_check", 0);
087        this.actionId = actionId;
088        this.registerForNotification = registerForNotification;
089        this.removeAvailDependencies = removeAvailDependencies;
090    }
091
092    protected CoordPushDependencyCheckXCommand(String actionName, String actionId) {
093        super(actionName, actionName, 0);
094        this.actionId = actionId;
095    }
096
097    @Override
098    protected void setLogInfo() {
099        LogUtils.setLogInfo(actionId);
100    }
101
102    @Override
103    protected Void execute() throws CommandException {
104        // this action should only get processed if current time > nominal time;
105        // otherwise, requeue this action for delay execution;
106        Date nominalTime = coordAction.getNominalTime();
107        Date currentTime = new Date();
108        if (nominalTime.compareTo(currentTime) > 0) {
109            queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), nominalTime.getTime() - currentTime.getTime());
110            updateCoordAction(coordAction, false);
111            LOG.info("[" + actionId
112                    + "]::CoordPushDependency:: nominal Time is newer than current time, so requeue and wait. Current="
113                    + DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime));
114            return null;
115        }
116
117        CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies();
118        CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies();
119        if (coordPushInputDependency.getMissingDependenciesAsList().size() == 0) {
120            LOG.info("Nothing to check. Empty push missing dependency");
121        }
122        else {
123            List<String> missingDependenciesArray = coordPushInputDependency.getMissingDependenciesAsList();
124            LOG.info("First Push missing dependency is [{0}] ", missingDependenciesArray.get(0));
125            LOG.trace("Push missing dependencies are [{0}] ", missingDependenciesArray);
126            if (registerForNotification) {
127                LOG.debug("Register for notifications is true");
128            }
129
130            try {
131                Configuration actionConf = null;
132                try {
133                    actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
134                }
135                catch (IOException e) {
136                    throw new CommandException(ErrorCode.E1307, e.getMessage(), e);
137                }
138
139
140                boolean isChangeInDependency = true;
141                boolean timeout = false;
142                ActionDependency actionDependency = coordPushInputDependency.checkPushMissingDependencies(coordAction,
143                        registerForNotification);
144                // Check all dependencies during materialization to avoid registering in the cache.
145                // But check only first missing one afterwards similar to
146                // CoordActionInputCheckXCommand for efficiency. listPartitions is costly.
147                if (actionDependency.getMissingDependencies().size() == missingDependenciesArray.size()) {
148                    isChangeInDependency = false;
149                }
150                else {
151                    String stillMissingDeps = DependencyChecker.dependenciesAsString(actionDependency.getMissingDependencies());
152                    coordPushInputDependency.setMissingDependencies(stillMissingDeps);
153                }
154
155                if (coordPushInputDependency.isDependencyMet()) {
156                    // All push-based dependencies are available
157                    onAllPushDependenciesAvailable(coordPullInputDependency.isDependencyMet());
158                }
159                else {
160                    // Checking for timeout
161                    timeout = isTimeout();
162                    if (timeout) {
163                        queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
164                    }
165                    else {
166                        queue(new CoordPushDependencyCheckXCommand(coordAction.getId()),
167                                getCoordPushCheckRequeueInterval());
168                    }
169                }
170
171                updateCoordAction(coordAction, isChangeInDependency || coordPushInputDependency.isDependencyMet());
172                if (registerForNotification) {
173                    registerForNotification(coordPushInputDependency.getMissingDependenciesAsList(), actionConf);
174                }
175                if (removeAvailDependencies) {
176                    unregisterAvailableDependencies(actionDependency.getAvailableDependencies());
177                }
178                if (timeout) {
179                    unregisterMissingDependencies(coordPushInputDependency.getMissingDependenciesAsList(), actionId);
180                }
181            }
182            catch (Exception e) {
183                final CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
184                if (isTimeout()) {
185                    LOG.debug("Queueing timeout command");
186                    // XCommand.queue() will not work when there is a Exception
187                    callableQueueService.queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
188                    unregisterMissingDependencies(missingDependenciesArray, actionId);
189                }
190                else if (coordPullInputDependency.getMissingDependenciesAsList().size() > 0) {
191                    // Queue again on exception as RecoveryService will not queue this again with
192                    // the action being updated regularly by CoordActionInputCheckXCommand
193                    callableQueueService.queue(new CoordPushDependencyCheckXCommand(coordAction.getId(),
194                            registerForNotification, removeAvailDependencies),
195                            Services.get().getConf().getInt(RecoveryService.CONF_COORD_OLDER_THAN, 600) * 1000);
196                }
197                throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
198            }
199        }
200        return null;
201    }
202
203    /**
204     * Return the re-queue interval for coord push dependency check
205     * @return
206     */
207    public long getCoordPushCheckRequeueInterval() {
208        long requeueInterval = ConfigurationService.getLong(CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL);
209        return requeueInterval;
210    }
211
212    /**
213     * Returns true if timeout period has been reached
214     *
215     * @return true if it is time for timeout else false
216     */
217    protected boolean isTimeout() {
218        long waitingTime = (new Date().getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
219                .getCreatedTime().getTime()))
220                / (60 * 1000);
221        int timeOut = coordAction.getTimeOut();
222        return (timeOut >= 0) && (waitingTime > timeOut);
223    }
224
225    protected void onAllPushDependenciesAvailable(boolean isPullDependencyMeet) throws CommandException {
226        Services.get().get(PartitionDependencyManagerService.class)
227                .removeCoordActionWithDependenciesAvailable(coordAction.getId());
228        if (isPullDependencyMeet) {
229            Date nominalTime = coordAction.getNominalTime();
230            Date currentTime = new Date();
231            // The action should become READY only if current time > nominal time;
232            // CoordActionInputCheckXCommand will take care of moving it to READY when it is nominal time.
233            if (nominalTime.compareTo(currentTime) > 0) {
234                LOG.info("[" + actionId + "]::ActionInputCheck:: nominal Time is newer than current time. Current="
235                        + DateUtils.formatDateOozieTZ(currentTime) + ", nominal="
236                        + DateUtils.formatDateOozieTZ(nominalTime));
237            }
238            else {
239                String actionXml = resolveCoordConfiguration();
240                coordAction.setActionXml(actionXml);
241                coordAction.setStatus(CoordinatorAction.Status.READY);
242                // pass jobID to the CoordActionReadyXCommand
243                queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
244            }
245        }
246        else if (isTimeout()) {
247            // If it is timeout and all push dependencies are available but still some unresolved
248            // missing dependencies queue CoordActionInputCheckXCommand now. Else it will have to
249            // wait till RecoveryService kicks in
250            queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()));
251        }
252        coordAction.getPushInputDependencies().setDependencyMet(true);
253
254    }
255
256    private String resolveCoordConfiguration() throws CommandException {
257        try {
258            Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
259            StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
260            String newActionXml = CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf,
261                    actionId, coordAction.getPullInputDependencies(), coordAction
262                            .getPushInputDependencies());
263            actionXml.replace(0, actionXml.length(), newActionXml);
264            return actionXml.toString();
265        }
266        catch (Exception e) {
267            throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
268        }
269    }
270
271    protected void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency)
272            throws CommandException {
273        coordAction.setLastModifiedTime(new Date());
274        if (jpaService != null) {
275            try {
276                if (isChangeInDependency) {
277                    coordAction.setPushMissingDependencies(coordAction.getPushInputDependencies().serialize());
278                    CoordActionQueryExecutor.getInstance().executeUpdate(
279                            CoordActionQuery.UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK, coordAction);
280                    if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) {
281                        // since event is not to be generated unless action
282                        // RUNNING via StartX
283                        generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
284                    }
285                }
286                else {
287                    CoordActionQueryExecutor.getInstance().executeUpdate(
288                            CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction);
289                }
290            }
291            catch (JPAExecutorException jex) {
292                throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
293            }
294            catch (IOException ioe) {
295                throw new CommandException(ErrorCode.E1021, ioe.getMessage(), ioe);
296            }
297        }
298    }
299
300    private void registerForNotification(List<String> missingDeps, Configuration actionConf) {
301        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
302        String user = actionConf.get(OozieClient.USER_NAME, OozieClient.USER_NAME);
303        for (String missingDep : missingDeps) {
304            try {
305                URI missingURI = new URI(missingDep);
306                URIHandler handler = uriService.getURIHandler(missingURI);
307                handler.registerForNotification(missingURI, actionConf, user, actionId);
308                    LOG.debug("Registered uri [{0}] for notifications", missingURI);
309            }
310            catch (Exception e) {
311                LOG.warn("Exception while registering uri [{0}] for notifications", missingDep, e);
312            }
313        }
314    }
315
316    private void unregisterAvailableDependencies(List<String> availableDeps) {
317        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
318        for (String availableDep : availableDeps) {
319            try {
320                URI availableURI = new URI(availableDep);
321                URIHandler handler = uriService.getURIHandler(availableURI);
322                if (handler.unregisterFromNotification(availableURI, actionId)) {
323                    LOG.debug("Successfully unregistered uri [{0}] from notifications", availableURI);
324                }
325                else {
326                    LOG.warn("Unable to unregister uri [{0}] from notifications", availableURI);
327                }
328            }
329            catch (Exception e) {
330                LOG.warn("Exception while unregistering uri [{0}] from notifications", availableDep, e);
331            }
332        }
333    }
334
335    public static void unregisterMissingDependencies(List<String> missingDeps, String actionId) {
336        final XLog LOG = XLog.getLog(CoordPushDependencyCheckXCommand.class);
337        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
338        for (String missingDep : missingDeps) {
339            try {
340                URI missingURI = new URI(missingDep);
341                URIHandler handler = uriService.getURIHandler(missingURI);
342                if (handler.unregisterFromNotification(missingURI, actionId)) {
343                    LOG.debug("Successfully unregistered uri [{0}] from notifications", missingURI);
344                }
345                else {
346                    LOG.warn("Unable to unregister uri [{0}] from notifications", missingURI);
347                }
348            }
349            catch (Exception e) {
350                LOG.warn("Exception while unregistering uri [{0}] from notifications", missingDep, e);
351            }
352        }
353    }
354
355    @Override
356    public String getEntityKey() {
357        return actionId.substring(0, actionId.indexOf("@"));
358    }
359
360    @Override
361    public String getKey(){
362        return getName() + "_" + actionId;
363    }
364
365    @Override
366    protected boolean isLockRequired() {
367        return true;
368    }
369
370    @Override
371    protected void loadState() throws CommandException {
372        jpaService = Services.get().get(JPAService.class);
373        try {
374            coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
375            if (coordAction != null) {
376                coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
377                LogUtils.setLogInfo(coordAction);
378            }
379            else {
380                throw new CommandException(ErrorCode.E0605, actionId);
381            }
382        }
383        catch (JPAExecutorException je) {
384            throw new CommandException(je);
385        }
386    }
387
388    @Override
389    protected void verifyPrecondition() throws CommandException, PreconditionException {
390        if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
391            throw new PreconditionException(ErrorCode.E1100, "[" + actionId
392                    + "]::CoordPushDependencyCheck:: Ignoring action. Should be in WAITING state, but state="
393                    + coordAction.getStatus());
394        }
395
396        // if eligible to do action input check when running with backward
397        // support is true
398        if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
399            return;
400        }
401
402        if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR
403                && coordJob.getStatus() != Job.Status.PAUSED && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
404            throw new PreconditionException(ErrorCode.E1100, "[" + actionId
405                    + "]::CoordPushDependencyCheck:: Ignoring action."
406                    + " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
407                    + coordJob.getStatus());
408        }
409    }
410
411}