001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import java.sql.Timestamp;
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026
027import org.apache.oozie.CoordinatorActionBean;
028import org.apache.oozie.CoordinatorJobBean;
029import org.apache.oozie.ErrorCode;
030import org.apache.oozie.SLAEventBean;
031import org.apache.oozie.WorkflowJobBean;
032import org.apache.oozie.XException;
033import org.apache.oozie.service.EventHandlerService;
034import org.apache.oozie.service.JPAService;
035import org.apache.oozie.service.Services;
036import org.apache.oozie.util.InstrumentUtils;
037import org.apache.oozie.util.LogUtils;
038import org.apache.oozie.util.ParamChecker;
039import org.apache.oozie.util.db.SLADbOperations;
040import org.apache.oozie.client.CoordinatorAction;
041import org.apache.oozie.client.WorkflowJob;
042import org.apache.oozie.client.SLAEvent.SlaAppType;
043import org.apache.oozie.client.SLAEvent.Status;
044import org.apache.oozie.client.rest.JsonBean;
045import org.apache.oozie.command.CommandException;
046import org.apache.oozie.command.PreconditionException;
047import org.apache.oozie.executor.jpa.BatchQueryExecutor;
048import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
049import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
050import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor;
051import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
052import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
053import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
054
055/**
056 * The command checks workflow status for coordinator action.
057 */
058@SuppressWarnings("deprecation")
059public class CoordActionCheckXCommand extends CoordinatorXCommand<Void> {
060    private String actionId;
061    private int actionCheckDelay;
062    private CoordinatorActionBean coordAction = null;
063    private CoordinatorJobBean coordJob;
064    private WorkflowJobBean workflowJob;
065    private JPAService jpaService = null;
066    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
067    private List<JsonBean> insertList = new ArrayList<JsonBean>();
068
069    public CoordActionCheckXCommand(String actionId, int actionCheckDelay) {
070        super("coord_action_check", "coord_action_check", 0);
071        this.actionId = ParamChecker.notEmpty(actionId, "actionId");
072        this.actionCheckDelay = actionCheckDelay;
073    }
074
075    /* (non-Javadoc)
076     * @see org.apache.oozie.command.XCommand#execute()
077     */
078    @Override
079    protected Void execute() throws CommandException {
080        try {
081            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
082            Status slaStatus = null;
083            CoordinatorAction.Status initialStatus = coordAction.getStatus();
084
085            if (workflowJob.getStatus() == WorkflowJob.Status.SUCCEEDED) {
086                coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED);
087                // set pending to false as the status is SUCCEEDED
088                coordAction.setPending(0);
089                slaStatus = Status.SUCCEEDED;
090            }
091            else {
092                if (workflowJob.getStatus() == WorkflowJob.Status.FAILED) {
093                    coordAction.setStatus(CoordinatorAction.Status.FAILED);
094                    slaStatus = Status.FAILED;
095                    // set pending to false as the status is FAILED
096                    coordAction.setPending(0);
097                }
098                else {
099                    if (workflowJob.getStatus() == WorkflowJob.Status.KILLED) {
100                        coordAction.setStatus(CoordinatorAction.Status.KILLED);
101                        slaStatus = Status.KILLED;
102                        // set pending to false as the status is KILLED
103                        coordAction.setPending(0);
104                    }
105                    else {
106                        LOG.warn("Unexpected workflow " + workflowJob.getId() + " STATUS " + workflowJob.getStatus());
107                        coordAction.setLastModifiedTime(new Date());
108                        CoordActionQueryExecutor.getInstance().executeUpdate(
109                                CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE,
110                                coordAction);
111                        return null;
112                    }
113                }
114            }
115
116            LOG.debug("Updating Coordinator actionId :" + coordAction.getId() + "status to ="
117                            + coordAction.getStatus());
118            coordAction.setLastModifiedTime(new Date());
119            updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME,
120                    coordAction));
121
122            if (slaStatus != null) {
123                SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus,
124                        SlaAppType.COORDINATOR_ACTION, LOG);
125                if(slaEvent != null) {
126                    insertList.add(slaEvent);
127                }
128            }
129
130            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
131            CoordinatorAction.Status endStatus = coordAction.getStatus();
132            if (endStatus != initialStatus && EventHandlerService.isEnabled()) {
133                generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflowJob.getStartTime());
134            }
135        }
136        catch (XException ex) {
137            LOG.warn("CoordActionCheckCommand Failed ", ex);
138            throw new CommandException(ex);
139        }
140        return null;
141    }
142
143    /* (non-Javadoc)
144     * @see org.apache.oozie.command.XCommand#getEntityKey()
145     */
146    @Override
147    public String getEntityKey() {
148        return actionId;
149    }
150
151    @Override
152    public String getKey() {
153        return getName() + "_" + actionId;
154    }
155
156    /* (non-Javadoc)
157     * @see org.apache.oozie.command.XCommand#isLockRequired()
158     */
159    @Override
160    protected boolean isLockRequired() {
161        return true;
162    }
163
164    /* (non-Javadoc)
165     * @see org.apache.oozie.command.XCommand#loadState()
166     */
167    @Override
168    protected void loadState() throws CommandException {
169        try {
170            jpaService = Services.get().get(JPAService.class);
171
172            if (jpaService != null) {
173                coordAction = jpaService.execute(new CoordActionGetForCheckJPAExecutor(actionId));
174                coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
175                        coordAction.getJobId()));
176                workflowJob = jpaService.execute (new WorkflowJobGetForSLAJPAExecutor(coordAction.getExternalId()));
177                LogUtils.setLogInfo(coordAction, logInfo);
178            }
179            else {
180                throw new CommandException(ErrorCode.E0610);
181            }
182        }
183        catch (XException ex) {
184            throw new CommandException(ex);
185        }
186    }
187
188    /* (non-Javadoc)
189     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
190     */
191    @Override
192    protected void verifyPrecondition() throws CommandException, PreconditionException {
193        // if the action has been updated, quit this command
194        Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000);
195        Timestamp cactionLmt = coordAction.getLastModifiedTimestamp();
196        if (cactionLmt.after(actionCheckTs)) {
197            throw new PreconditionException(ErrorCode.E1100, "The coord action :" + actionId
198                    + " has been udated. Ignore CoordActionCheckCommand!");
199        }
200        if (coordAction.getStatus().equals(CoordinatorAction.Status.SUCCEEDED)
201                || coordAction.getStatus().equals(CoordinatorAction.Status.FAILED)
202                || coordAction.getStatus().equals(CoordinatorAction.Status.KILLED)) {
203            throw new PreconditionException(ErrorCode.E1100, "The coord action [" + actionId + "] must not have status "
204                    + CoordinatorAction.Status.SUCCEEDED.name() + ", " + CoordinatorAction.Status.FAILED.name()
205                    + ", or " + CoordinatorAction.Status.KILLED.name() + " but has status [" + coordAction.getStatus().name()
206                    + "]");
207        }
208    }
209}