001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.coord; 019 020import java.util.Date; 021import java.util.List; 022 023import org.apache.oozie.CoordinatorActionBean; 024import org.apache.oozie.CoordinatorActionInfo; 025import org.apache.oozie.CoordinatorJobBean; 026import org.apache.oozie.ErrorCode; 027import org.apache.oozie.client.CoordinatorAction.Status; 028import org.apache.oozie.client.CoordinatorJob; 029import org.apache.oozie.command.CommandException; 030import org.apache.oozie.command.IgnoreTransitionXCommand; 031import org.apache.oozie.command.PreconditionException; 032import org.apache.oozie.coord.CoordUtils; 033import org.apache.oozie.executor.jpa.BatchQueryExecutor; 034import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 035import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 036import org.apache.oozie.executor.jpa.JPAExecutorException; 037import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 038import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 039import org.apache.oozie.util.LogUtils; 040import org.apache.oozie.util.ParamChecker; 041 042 043public class CoordActionsIgnoreXCommand extends IgnoreTransitionXCommand<CoordinatorActionInfo>{ 044 045 CoordinatorJobBean coordJob; 046 String jobId; 047 String type; 048 String scope; 049 private List<CoordinatorActionBean> coordActions; 050 051 public CoordActionsIgnoreXCommand(String coordId, String type, String scope) { 052 super("coord_action_ignore", "coord_action_ignore", 1); 053 this.jobId = ParamChecker.notEmpty(coordId, "coordJobId"); 054 this.type = ParamChecker.notEmpty(type, "type"); 055 this.scope = ParamChecker.notEmpty(scope, "scope"); 056 } 057 058 @Override 059 protected void verifyPrecondition() throws CommandException, PreconditionException { 060 // no actions to ignore for PREP job 061 if (coordJob.getStatus() == CoordinatorJob.Status.PREP) { 062 LOG.info("CoordActionsIgnoreXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId); 063 throw new PreconditionException(ErrorCode.E1024, "No actions are materialized to ignore"); 064 } 065 StringBuilder ineligibleActions = new StringBuilder(); 066 if (!checkAllActionsStatus(ineligibleActions)) { 067 throw new CommandException(ErrorCode.E1024, 068 "part or all actions are not eligible to ignore, check state of action number(s) [" 069 + ineligibleActions.toString() + "]"); 070 } 071 } 072 073 @Override 074 public void ignoreChildren() throws CommandException { 075 for (CoordinatorActionBean action : coordActions) { 076 action.setStatus(Status.IGNORED); 077 action.setLastModifiedTime(new Date()); 078 action.setPending(0); 079 updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, 080 action)); 081 LOG.info("Ignore coord action = [{0}], new status = [{1}]", action.getId(), action.getStatus()); 082 } 083 ret = new CoordinatorActionInfo(coordActions); 084 } 085 086 private boolean checkAllActionsStatus(StringBuilder ineligibleActions) 087 throws CommandException { 088 boolean ret = true; 089 if (coordActions == null || coordActions.size() == 0) { 090 throw new CommandException(ErrorCode.E1024, "no actions are eligible to ignore"); 091 } 092 for (CoordinatorActionBean action : coordActions) { 093 ParamChecker.notNull(action, "Action cannot be null"); 094 if (!(action.getStatus() == Status.FAILED || action.getStatus() == Status.KILLED 095 || action.getStatus() == Status.TIMEDOUT)) { 096 LOG.info("Cannot ignore coord action = [{0}], since its status is [{1}]", action.getId(), 097 action.getStatus()); 098 if (ineligibleActions.length() != 0) { 099 ineligibleActions.append(","); 100 } 101 ineligibleActions.append(action.getActionNumber()); 102 ret = false; 103 } 104 } 105 return ret; 106 } 107 108 @Override 109 public void performWrites() throws CommandException { 110 try { 111 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 112 } 113 catch (JPAExecutorException jex) { 114 throw new CommandException(jex); 115 } 116 } 117 118 @Override 119 protected boolean isLockRequired() { 120 return true; 121 } 122 123 @Override 124 public String getEntityKey() { 125 return jobId; 126 } 127 128 @Override 129 protected void loadState() throws CommandException { 130 try{ 131 coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_STATUS_PARENTID, jobId); 132 coordActions = CoordUtils.getCoordActions(type, jobId, scope, false); 133 }catch (Exception ex){ 134 throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex); 135 } 136 LogUtils.setLogInfo(this.coordJob, logInfo); 137 } 138}