001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.util.ArrayList; 021 import java.util.Arrays; 022 import java.util.Collection; 023 import java.util.List; 024 import org.apache.oozie.command.CommandException; 025 import org.apache.oozie.dependency.DependencyChecker; 026 import org.apache.oozie.service.PartitionDependencyManagerService; 027 import org.apache.oozie.service.Services; 028 029 public class CoordActionUpdatePushMissingDependency extends CoordPushDependencyCheckXCommand { 030 031 public CoordActionUpdatePushMissingDependency(String actionId) { 032 super("coord_action_push_md", actionId); 033 } 034 035 @Override 036 protected Void execute() throws CommandException { 037 LOG.info("STARTED for Action id [{0}]", actionId); 038 String pushMissingDeps = coordAction.getPushMissingDependencies(); 039 if (pushMissingDeps == null || pushMissingDeps.length() == 0) { 040 LOG.info("Nothing to check. Empty push missing dependency"); 041 } 042 else { 043 PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class); 044 Collection<String> availDepList = pdms.getAvailableDependencyURIs(actionId); 045 if (availDepList == null || availDepList.size() == 0) { 046 LOG.info("There are no available dependencies"); 047 if (isTimeout()) { // Poll and check as one last try 048 queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100); 049 } 050 } 051 else { 052 LOG.debug("Updating with available uris=[{0}] where missing uris=[{1}]", availDepList.toString(), 053 pushMissingDeps); 054 055 String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps); 056 List<String> stillMissingDepsList = new ArrayList<String>(Arrays.asList(missingDepsArray)); 057 stillMissingDepsList.removeAll(availDepList); 058 boolean isChangeInDependency = true; 059 if (stillMissingDepsList.size() == 0) { 060 // All push-based dependencies are available 061 onAllPushDependenciesAvailable(); 062 } 063 else { 064 if (stillMissingDepsList.size() == missingDepsArray.length) { 065 isChangeInDependency = false; 066 } 067 else { 068 String stillMissingDeps = DependencyChecker.dependenciesAsString(stillMissingDepsList); 069 coordAction.setPushMissingDependencies(stillMissingDeps); 070 } 071 if (isTimeout()) { // Poll and check as one last try 072 queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100); 073 } 074 } 075 updateCoordAction(coordAction, isChangeInDependency); 076 removeAvailableDependencies(pdms, availDepList); 077 LOG.info("ENDED for Action id [{0}]", actionId); 078 } 079 } 080 return null; 081 } 082 083 private void removeAvailableDependencies(PartitionDependencyManagerService pdms, Collection<String> availDepList) { 084 if (pdms.removeAvailableDependencyURIs(actionId, availDepList)) { 085 LOG.debug("Successfully removed uris [{0}] from available list", availDepList.toString()); 086 } 087 else { 088 LOG.warn("Failed to remove uris [{0}] from available list", availDepList.toString(), actionId); 089 } 090 } 091 092 @Override 093 public String getEntityKey() { 094 return actionId; 095 } 096 097 }