001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import java.util.Date; 022import java.util.List; 023 024import org.apache.oozie.CoordinatorActionBean; 025import org.apache.oozie.CoordinatorActionInfo; 026import org.apache.oozie.CoordinatorJobBean; 027import org.apache.oozie.ErrorCode; 028import org.apache.oozie.client.CoordinatorAction.Status; 029import org.apache.oozie.client.CoordinatorJob; 030import org.apache.oozie.command.CommandException; 031import org.apache.oozie.command.IgnoreTransitionXCommand; 032import org.apache.oozie.command.PreconditionException; 033import org.apache.oozie.coord.CoordUtils; 034import org.apache.oozie.executor.jpa.BatchQueryExecutor; 035import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 036import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 037import org.apache.oozie.executor.jpa.JPAExecutorException; 038import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 039import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 040import org.apache.oozie.util.LogUtils; 041import org.apache.oozie.util.ParamChecker; 042 043 044public class CoordActionsIgnoreXCommand extends IgnoreTransitionXCommand<CoordinatorActionInfo>{ 045 046 CoordinatorJobBean coordJob; 047 String jobId; 048 String type; 049 String scope; 050 private List<CoordinatorActionBean> coordActions; 051 052 public CoordActionsIgnoreXCommand(String coordId, String type, String scope) { 053 super("coord_action_ignore", "coord_action_ignore", 1); 054 this.jobId = ParamChecker.notEmpty(coordId, "coordJobId"); 055 this.type = ParamChecker.notEmpty(type, "type"); 056 this.scope = ParamChecker.notEmpty(scope, "scope"); 057 } 058 059 @Override 060 protected void verifyPrecondition() throws CommandException, PreconditionException { 061 // no actions to ignore for PREP job 062 if (coordJob.getStatus() == CoordinatorJob.Status.PREP) { 063 LOG.info("CoordActionsIgnoreXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId); 064 throw new PreconditionException(ErrorCode.E1024, "No actions are materialized to ignore"); 065 } 066 StringBuilder ineligibleActions = new StringBuilder(); 067 if (!checkAllActionsStatus(ineligibleActions)) { 068 throw new CommandException(ErrorCode.E1024, 069 "part or all actions are not eligible to ignore, check state of action number(s) [" 070 + ineligibleActions.toString() + "]"); 071 } 072 } 073 074 @Override 075 public void ignoreChildren() throws CommandException { 076 for (CoordinatorActionBean action : coordActions) { 077 action.setStatus(Status.IGNORED); 078 action.setLastModifiedTime(new Date()); 079 action.setPending(0); 080 updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, 081 action)); 082 LOG.info("Ignore coord action = [{0}], new status = [{1}]", action.getId(), action.getStatus()); 083 } 084 ret = new CoordinatorActionInfo(coordActions); 085 } 086 087 private boolean checkAllActionsStatus(StringBuilder ineligibleActions) 088 throws CommandException { 089 boolean ret = true; 090 if (coordActions == null || coordActions.size() == 0) { 091 throw new CommandException(ErrorCode.E1024, "no actions are eligible to ignore"); 092 } 093 for (CoordinatorActionBean action : coordActions) { 094 ParamChecker.notNull(action, "Action"); 095 if (!(action.getStatus() == Status.FAILED || action.getStatus() == Status.KILLED 096 || action.getStatus() == Status.TIMEDOUT)) { 097 LOG.info("Cannot ignore coord action = [{0}], since its status is [{1}]", action.getId(), 098 action.getStatus()); 099 if (ineligibleActions.length() != 0) { 100 ineligibleActions.append(","); 101 } 102 ineligibleActions.append(action.getActionNumber()); 103 ret = false; 104 } 105 } 106 return ret; 107 } 108 109 @Override 110 public void performWrites() throws CommandException { 111 try { 112 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 113 } 114 catch (JPAExecutorException jex) { 115 throw new CommandException(jex); 116 } 117 } 118 119 @Override 120 protected boolean isLockRequired() { 121 return true; 122 } 123 124 @Override 125 public String getEntityKey() { 126 return jobId; 127 } 128 129 @Override 130 protected void loadState() throws CommandException { 131 try{ 132 coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_STATUS_PARENTID, jobId); 133 coordActions = CoordUtils.getCoordActions(type, jobId, scope, false); 134 }catch (Exception ex){ 135 throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex); 136 } 137 LogUtils.setLogInfo(this.coordJob); 138 } 139}