001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.util.ArrayList;
021    import java.util.Arrays;
022    import java.util.Collection;
023    import java.util.List;
024    import org.apache.oozie.command.CommandException;
025    import org.apache.oozie.dependency.DependencyChecker;
026    import org.apache.oozie.service.PartitionDependencyManagerService;
027    import org.apache.oozie.service.Services;
028    
029    public class CoordActionUpdatePushMissingDependency extends CoordPushDependencyCheckXCommand {
030    
031        public CoordActionUpdatePushMissingDependency(String actionId) {
032            super("coord_action_push_md", actionId);
033        }
034    
035        @Override
036        protected Void execute() throws CommandException {
037            LOG.info("STARTED for Action id [{0}]", actionId);
038            String pushMissingDeps = coordAction.getPushMissingDependencies();
039            if (pushMissingDeps == null || pushMissingDeps.length() == 0) {
040                LOG.info("Nothing to check. Empty push missing dependency");
041            }
042            else {
043                PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
044                Collection<String> availDepList = pdms.getAvailableDependencyURIs(actionId);
045                if (availDepList == null || availDepList.size() == 0) {
046                    LOG.info("There are no available dependencies");
047                    if (isTimeout()) { // Poll and check as one last try
048                        queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100);
049                    }
050                }
051                else {
052                    LOG.debug("Updating with available uris=[{0}] where missing uris=[{1}]", availDepList.toString(),
053                            pushMissingDeps);
054    
055                    String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
056                    List<String> stillMissingDepsList = new ArrayList<String>(Arrays.asList(missingDepsArray));
057                    stillMissingDepsList.removeAll(availDepList);
058                    boolean isChangeInDependency = true;
059                    if (stillMissingDepsList.size() == 0) {
060                        // All push-based dependencies are available
061                        onAllPushDependenciesAvailable();
062                    }
063                    else {
064                        if (stillMissingDepsList.size() == missingDepsArray.length) {
065                            isChangeInDependency = false;
066                        }
067                        else {
068                            String stillMissingDeps = DependencyChecker.dependenciesAsString(stillMissingDepsList);
069                            coordAction.setPushMissingDependencies(stillMissingDeps);
070                        }
071                        if (isTimeout()) { // Poll and check as one last try
072                            queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100);
073                        }
074                    }
075                    updateCoordAction(coordAction, isChangeInDependency);
076                    removeAvailableDependencies(pdms, availDepList);
077                    LOG.info("ENDED for Action id [{0}]", actionId);
078                }
079            }
080            return null;
081        }
082    
083        private void removeAvailableDependencies(PartitionDependencyManagerService pdms, Collection<String> availDepList) {
084            if (pdms.removeAvailableDependencyURIs(actionId, availDepList)) {
085                LOG.debug("Successfully removed uris [{0}] from available list", availDepList.toString());
086            }
087            else {
088                LOG.warn("Failed to remove uris [{0}] from available list", availDepList.toString(), actionId);
089            }
090        }
091    
092        @Override
093        public String getEntityKey() {
094            return actionId;
095        }
096    
097    }