001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.Collection;
023import java.util.List;
024import org.apache.oozie.command.CommandException;
025import org.apache.oozie.dependency.DependencyChecker;
026import org.apache.oozie.service.PartitionDependencyManagerService;
027import org.apache.oozie.service.Services;
028
029public class CoordActionUpdatePushMissingDependency extends CoordPushDependencyCheckXCommand {
030
031    public CoordActionUpdatePushMissingDependency(String actionId) {
032        super("coord_action_push_md", actionId);
033    }
034
035    @Override
036    protected Void execute() throws CommandException {
037        LOG.info("STARTED for Action id [{0}]", actionId);
038        String pushMissingDeps = coordAction.getPushMissingDependencies();
039        if (pushMissingDeps == null || pushMissingDeps.length() == 0) {
040            LOG.info("Nothing to check. Empty push missing dependency");
041        }
042        else {
043            PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
044            Collection<String> availDepList = pdms.getAvailableDependencyURIs(actionId);
045            if (availDepList == null || availDepList.size() == 0) {
046                LOG.info("There are no available dependencies");
047                if (isTimeout()) { // Poll and check as one last try
048                    queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100);
049                }
050            }
051            else {
052                LOG.debug("Updating with available uris=[{0}] where missing uris=[{1}]", availDepList.toString(),
053                        pushMissingDeps);
054
055                String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
056                List<String> stillMissingDepsList = new ArrayList<String>(Arrays.asList(missingDepsArray));
057                stillMissingDepsList.removeAll(availDepList);
058                boolean isChangeInDependency = true;
059                if (stillMissingDepsList.size() == 0) {
060                    // All push-based dependencies are available
061                    onAllPushDependenciesAvailable();
062                }
063                else {
064                    if (stillMissingDepsList.size() == missingDepsArray.length) {
065                        isChangeInDependency = false;
066                    }
067                    else {
068                        String stillMissingDeps = DependencyChecker.dependenciesAsString(stillMissingDepsList);
069                        coordAction.setPushMissingDependencies(stillMissingDeps);
070                    }
071                    if (isTimeout()) { // Poll and check as one last try
072                        queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100);
073                    }
074                }
075                updateCoordAction(coordAction, isChangeInDependency);
076                removeAvailableDependencies(pdms, availDepList);
077                LOG.info("ENDED for Action id [{0}]", actionId);
078            }
079        }
080        return null;
081    }
082
083    private void removeAvailableDependencies(PartitionDependencyManagerService pdms, Collection<String> availDepList) {
084        if (pdms.removeAvailableDependencyURIs(actionId, availDepList)) {
085            LOG.debug("Successfully removed uris [{0}] from available list", availDepList.toString());
086        }
087        else {
088            LOG.warn("Failed to remove uris [{0}] from available list", availDepList.toString(), actionId);
089        }
090    }
091
092    @Override
093    public String getEntityKey() {
094        return actionId;
095    }
096
097}