001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.coord;
020
021import java.util.Collection;
022import org.apache.oozie.command.CommandException;
023import org.apache.oozie.coord.input.dependency.CoordInputDependency;
024import org.apache.oozie.dependency.DependencyChecker;
025import org.apache.oozie.service.PartitionDependencyManagerService;
026import org.apache.oozie.service.Services;
027
028public class CoordActionUpdatePushMissingDependency extends CoordPushDependencyCheckXCommand {
029
030    public CoordActionUpdatePushMissingDependency(String actionId) {
031        super("coord_action_push_md", actionId);
032    }
033
034    @Override
035    protected Void execute() throws CommandException {
036        CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies();
037        CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies();
038
039        LOG.info("STARTED for Action id [{0}]", actionId);
040        if (coordPushInputDependency.isDependencyMet()) {
041            LOG.info("Nothing to check. Empty push missing dependency");
042        }
043        else {
044            PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
045            Collection<String> availDepList = pdms.getAvailableDependencyURIs(actionId);
046            if (availDepList == null || availDepList.size() == 0) {
047                LOG.info("There are no available dependencies");
048                if (isTimeout()) { // Poll and check as one last try
049                    queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100);
050                }
051            }
052            else {
053                String pushMissingDependencies = coordPushInputDependency.getMissingDependencies().toString();
054                LOG.debug("Updating with available uris = [{0}] where missing uris = [{1}]", pushMissingDependencies);
055                String[] missingDependenciesArray = DependencyChecker.dependenciesAsArray(pushMissingDependencies);
056                coordPushInputDependency.addToAvailableDependencies(availDepList);
057                boolean isChangeInDependency = true;
058                if (coordPushInputDependency.isDependencyMet()) {
059                    // All push-based dependencies are available
060                    onAllPushDependenciesAvailable(coordPullInputDependency.isDependencyMet());
061                }
062                else {
063                    if (coordPushInputDependency.getMissingDependenciesAsList().size() == missingDependenciesArray.length) {
064                        isChangeInDependency = false;
065                    }
066                    if (isTimeout()) { // Poll and check as one last try
067                        queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100);
068                    }
069                }
070                updateCoordAction(coordAction, isChangeInDependency);
071                removeAvailableDependencies(pdms, availDepList);
072                LOG.info("ENDED for Action id [{0}]", actionId);
073            }
074        }
075        return null;
076    }
077
078    private void removeAvailableDependencies(PartitionDependencyManagerService pdms, Collection<String> availDepList) {
079        if (pdms.removeAvailableDependencyURIs(actionId, availDepList)) {
080            LOG.debug("Successfully removed uris [{0}] from available list", availDepList.toString());
081        }
082        else {
083            LOG.warn("Failed to remove uris [{0}] from available list", availDepList.toString(), actionId);
084        }
085    }
086
087    @Override
088    public String getEntityKey() {
089        return actionId;
090    }
091
092}