001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.coord;
020
021import java.util.Date;
022import java.util.List;
023
024import org.apache.oozie.CoordinatorActionBean;
025import org.apache.oozie.CoordinatorActionInfo;
026import org.apache.oozie.CoordinatorJobBean;
027import org.apache.oozie.ErrorCode;
028import org.apache.oozie.client.CoordinatorAction.Status;
029import org.apache.oozie.client.CoordinatorJob;
030import org.apache.oozie.command.CommandException;
031import org.apache.oozie.command.IgnoreTransitionXCommand;
032import org.apache.oozie.command.PreconditionException;
033import org.apache.oozie.coord.CoordUtils;
034import org.apache.oozie.executor.jpa.BatchQueryExecutor;
035import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
036import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
037import org.apache.oozie.executor.jpa.JPAExecutorException;
038import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
039import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
040import org.apache.oozie.util.LogUtils;
041import org.apache.oozie.util.ParamChecker;
042
043
044public class CoordActionsIgnoreXCommand extends IgnoreTransitionXCommand<CoordinatorActionInfo>{
045
046    CoordinatorJobBean coordJob;
047    String jobId;
048    String type;
049    String scope;
050    private List<CoordinatorActionBean> coordActions;
051
052    public CoordActionsIgnoreXCommand(String coordId, String type, String scope) {
053        super("coord_action_ignore", "coord_action_ignore", 1);
054        this.jobId = ParamChecker.notEmpty(coordId, "coordJobId");
055        this.type = ParamChecker.notEmpty(type, "type");
056        this.scope = ParamChecker.notEmpty(scope, "scope");
057    }
058
059    @Override
060    protected void verifyPrecondition() throws CommandException, PreconditionException {
061        // no actions to ignore for PREP job
062        if (coordJob.getStatus() == CoordinatorJob.Status.PREP) {
063            LOG.info("CoordActionsIgnoreXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId);
064            throw new PreconditionException(ErrorCode.E1024, "No actions are materialized to ignore");
065        }
066        StringBuilder ineligibleActions = new StringBuilder();
067        if (!checkAllActionsStatus(ineligibleActions)) {
068            throw new CommandException(ErrorCode.E1024,
069                    "part or all actions are not eligible to ignore, check state of action number(s) ["
070                            + ineligibleActions.toString() + "]");
071        }
072    }
073
074    @Override
075    public void ignoreChildren() throws CommandException {
076        for (CoordinatorActionBean action : coordActions) {
077            action.setStatus(Status.IGNORED);
078            action.setLastModifiedTime(new Date());
079            action.setPending(0);
080            updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME,
081                    action));
082            LOG.info("Ignore coord action = [{0}], new status = [{1}]", action.getId(), action.getStatus());
083        }
084        ret = new CoordinatorActionInfo(coordActions);
085    }
086
087    private boolean checkAllActionsStatus(StringBuilder ineligibleActions)
088            throws CommandException {
089        boolean ret = true;
090        if (coordActions == null || coordActions.size() == 0) {
091            throw new CommandException(ErrorCode.E1024, "no actions are eligible to ignore");
092        }
093        for (CoordinatorActionBean action : coordActions) {
094            ParamChecker.notNull(action, "Action");
095            if (!(action.getStatus() == Status.FAILED || action.getStatus() == Status.KILLED
096                    || action.getStatus() == Status.TIMEDOUT)) {
097                LOG.info("Cannot ignore coord action = [{0}], since its status is [{1}]", action.getId(),
098                        action.getStatus());
099                if (ineligibleActions.length() != 0) {
100                    ineligibleActions.append(",");
101                }
102                ineligibleActions.append(action.getActionNumber());
103                ret = false;
104            }
105        }
106        return ret;
107    }
108
109    @Override
110    public void performWrites() throws CommandException {
111        try {
112            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
113        }
114        catch (JPAExecutorException jex) {
115            throw new CommandException(jex);
116        }
117    }
118
119    @Override
120    protected boolean isLockRequired() {
121        return true;
122    }
123
124    @Override
125    public String getEntityKey() {
126        return jobId;
127    }
128
129    @Override
130    protected void loadState() throws CommandException {
131        try{
132            coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_STATUS_PARENTID, jobId);
133            coordActions = CoordUtils.getCoordActions(type, jobId, scope, false);
134        }catch (Exception ex){
135            throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
136        }
137        LogUtils.setLogInfo(this.coordJob);
138    }
139}