001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import org.apache.oozie.CoordinatorActionBean;
021    import org.apache.oozie.WorkflowJobBean;
022    import org.apache.oozie.XException;
023    import org.apache.oozie.store.CoordinatorStore;
024    import org.apache.oozie.store.StoreException;
025    import org.apache.oozie.util.XLog;
026    import org.apache.oozie.util.db.SLADbOperations;
027    import org.apache.oozie.client.CoordinatorAction;
028    import org.apache.oozie.client.WorkflowJob;
029    import org.apache.oozie.client.SLAEvent.SlaAppType;
030    import org.apache.oozie.client.SLAEvent.Status;
031    import org.apache.oozie.command.CommandException;
032    
033    public class CoordActionUpdateCommand extends CoordinatorCommand<Void> {
034        private final XLog log = XLog.getLog(getClass());
035        private WorkflowJobBean workflow;
036        private CoordinatorActionBean caction = null;
037    
038        public CoordActionUpdateCommand(WorkflowJobBean workflow) {
039            super("coord-action-update", "coord-action-update", 1, XLog.OPS);
040            this.workflow = workflow;
041        }
042    
043        @Override
044        protected Void call(CoordinatorStore cstore) throws StoreException, CommandException {
045            try {
046                if (workflow.getStatus() == WorkflowJob.Status.RUNNING
047                        || workflow.getStatus() == WorkflowJob.Status.SUSPENDED) {
048                    //update lastModifiedTime
049                    cstore.updateCoordinatorAction(caction);
050                    return null;
051                }
052                // CoordinatorActionBean caction =
053                // cstore.getCoordinatorActionForExternalId(workflow.getId());
054                Status slaStatus = null;
055                if (caction != null) {
056                    if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) {
057                        caction.setStatus(CoordinatorAction.Status.SUCCEEDED);
058                        slaStatus = Status.SUCCEEDED;
059                    }
060                    else {
061                        if (workflow.getStatus() == WorkflowJob.Status.FAILED) {
062                            caction.setStatus(CoordinatorAction.Status.FAILED);
063                            slaStatus = Status.FAILED;
064                        }
065                        else {
066                            if (workflow.getStatus() == WorkflowJob.Status.KILLED) {
067                                caction.setStatus(CoordinatorAction.Status.KILLED);
068                                slaStatus = Status.KILLED;
069                            }
070                            else {
071                                log.warn(
072                                        "Unexpected workflow " + workflow.getId() + " STATUS " + workflow.getStatus());
073                                //update lastModifiedTime
074                                cstore.updateCoordinatorAction(caction);
075                                return null;
076                            }
077                        }
078                    }
079    
080                    log.info(
081                            "Updating Coordintaor id :" + caction.getId() + "status to =" + caction.getStatus());
082                    cstore.updateCoordinatorAction(caction);
083                    if (slaStatus != null) {
084                        SLADbOperations.writeStausEvent(caction.getSlaXml(), caction.getId(), cstore, slaStatus,
085                                                        SlaAppType.COORDINATOR_ACTION);
086                    }
087                    queueCallable(new CoordActionReadyCommand(caction.getJobId()));
088                }
089            }
090            catch (XException ex) {
091                log.warn("CoordActionUpdate Failed ", ex.getMessage());
092                throw new CommandException(ex);
093            }
094            return null;
095        }
096    
097        @Override
098        protected Void execute(CoordinatorStore store) throws StoreException, CommandException {
099            log.info("STARTED CoordActionUpdateCommand for wfId=" + workflow.getId());
100            caction = store.getCoordinatorActionForExternalId(workflow.getId());
101            if (caction == null) {
102                log.info("ENDED CoordActionUpdateCommand for wfId=" + workflow.getId() + ", coord action is null");
103                return null;
104            }
105            setLogInfo(caction);
106            String jobId = caction.getJobId();
107            try {
108                if (lock(jobId)) {
109                    call(store);
110                }
111                else {
112                    queueCallable(new CoordActionUpdateCommand(workflow), LOCK_FAILURE_REQUEUE_INTERVAL);
113                    log.warn("CoordActionUpdateCommand lock was not acquired - failed JobId=" + jobId + ", wfId="
114                            + workflow.getId() + ". Requeing the same.");
115                }
116            }
117            catch (InterruptedException e) {
118                queueCallable(new CoordActionUpdateCommand(workflow), LOCK_FAILURE_REQUEUE_INTERVAL);
119                log.warn("CoordActionUpdateCommand lock acquiring failed with exception " + e.getMessage() + " for jobId="
120                        + jobId + ", wfId=" + workflow.getId() + ". Requeing the same.");
121            }
122            finally {
123                log.info("ENDED CoordActionUpdateCommand for wfId=" + workflow.getId() + ", jobId=" + jobId);
124            }
125            return null;
126        }
127    }