001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import java.util.Collection; 022import org.apache.oozie.command.CommandException; 023import org.apache.oozie.coord.input.dependency.CoordInputDependency; 024import org.apache.oozie.dependency.DependencyChecker; 025import org.apache.oozie.service.PartitionDependencyManagerService; 026import org.apache.oozie.service.Services; 027 028public class CoordActionUpdatePushMissingDependency extends CoordPushDependencyCheckXCommand { 029 030 public CoordActionUpdatePushMissingDependency(String actionId) { 031 super("coord_action_push_md", actionId); 032 } 033 034 @Override 035 protected Void execute() throws CommandException { 036 CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies(); 037 CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies(); 038 039 LOG.info("STARTED for Action id [{0}]", actionId); 040 if (coordPushInputDependency.isDependencyMet()) { 041 LOG.info("Nothing to check. Empty push missing dependency"); 042 } 043 else { 044 PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class); 045 Collection<String> availDepList = pdms.getAvailableDependencyURIs(actionId); 046 if (availDepList == null || availDepList.size() == 0) { 047 LOG.info("There are no available dependencies"); 048 if (isTimeout()) { // Poll and check as one last try 049 queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100); 050 } 051 } 052 else { 053 String pushMissingDependencies = coordPushInputDependency.getMissingDependencies().toString(); 054 LOG.debug("Updating with available uris = [{0}] where missing uris = [{1}]", pushMissingDependencies); 055 String[] missingDependenciesArray = DependencyChecker.dependenciesAsArray(pushMissingDependencies); 056 coordPushInputDependency.addToAvailableDependencies(availDepList); 057 boolean isChangeInDependency = true; 058 if (coordPushInputDependency.isDependencyMet()) { 059 // All push-based dependencies are available 060 onAllPushDependenciesAvailable(coordPullInputDependency.isDependencyMet()); 061 } 062 else { 063 if (coordPushInputDependency.getMissingDependenciesAsList().size() == missingDependenciesArray.length) { 064 isChangeInDependency = false; 065 } 066 if (isTimeout()) { // Poll and check as one last try 067 queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100); 068 } 069 } 070 updateCoordAction(coordAction, isChangeInDependency); 071 removeAvailableDependencies(pdms, availDepList); 072 LOG.info("ENDED for Action id [{0}]", actionId); 073 } 074 } 075 return null; 076 } 077 078 private void removeAvailableDependencies(PartitionDependencyManagerService pdms, Collection<String> availDepList) { 079 if (pdms.removeAvailableDependencyURIs(actionId, availDepList)) { 080 LOG.debug("Successfully removed uris [{0}] from available list", availDepList.toString()); 081 } 082 else { 083 LOG.warn("Failed to remove uris [{0}] from available list", availDepList.toString(), actionId); 084 } 085 } 086 087 @Override 088 public String getEntityKey() { 089 return actionId; 090 } 091 092}