This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.util.ArrayList;
021 import java.util.Date;
022 import java.util.List;
023
024 import org.apache.oozie.CoordinatorActionBean;
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.SLAEventBean;
027 import org.apache.oozie.WorkflowJobBean;
028 import org.apache.oozie.XException;
029 import org.apache.oozie.service.JPAService;
030 import org.apache.oozie.service.Services;
031 import org.apache.oozie.util.LogUtils;
032 import org.apache.oozie.util.db.SLADbOperations;
033 import org.apache.oozie.client.CoordinatorAction;
034 import org.apache.oozie.client.WorkflowJob;
035 import org.apache.oozie.client.SLAEvent.SlaAppType;
036 import org.apache.oozie.client.SLAEvent.Status;
037 import org.apache.oozie.client.rest.JsonBean;
038 import org.apache.oozie.command.CommandException;
039 import org.apache.oozie.command.PreconditionException;
040 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
041 import org.apache.oozie.executor.jpa.CoordActionGetForExternalIdJPAExecutor;
042 import org.apache.oozie.executor.jpa.JPAExecutorException;
043
044 public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> {
045 private WorkflowJobBean workflow;
046 private CoordinatorActionBean coordAction = null;
047 private JPAService jpaService = null;
048 private int maxRetries = 1;
049 private List<JsonBean> updateList = new ArrayList<JsonBean>();
050 private List<JsonBean> insertList = new ArrayList<JsonBean>();
051
052 public CoordActionUpdateXCommand(WorkflowJobBean workflow) {
053 super("coord-action-update", "coord-action-update", 1);
054 this.workflow = workflow;
055 }
056
057 public CoordActionUpdateXCommand(WorkflowJobBean workflow, int maxRetries) {
058 super("coord-action-update", "coord-action-update", 1);
059 this.workflow = workflow;
060 this.maxRetries = maxRetries;
061 }
062
063 @Override
064 protected Void execute() throws CommandException {
065 try {
066 LOG.debug("STARTED CoordActionUpdateXCommand for wfId=" + workflow.getId());
067
068 Status slaStatus = null;
069 CoordinatorAction.Status preCoordStatus = coordAction.getStatus();
070 if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) {
071 coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED);
072 coordAction.setPending(0);
073 slaStatus = Status.SUCCEEDED;
074 }
075 else if (workflow.getStatus() == WorkflowJob.Status.FAILED) {
076 coordAction.setStatus(CoordinatorAction.Status.FAILED);
077 coordAction.setPending(0);
078 slaStatus = Status.FAILED;
079 }
080 else if (workflow.getStatus() == WorkflowJob.Status.KILLED) {
081 coordAction.setStatus(CoordinatorAction.Status.KILLED);
082 coordAction.setPending(0);
083 slaStatus = Status.KILLED;
084 }
085 else if (workflow.getStatus() == WorkflowJob.Status.SUSPENDED) {
086 coordAction.setStatus(CoordinatorAction.Status.SUSPENDED);
087 coordAction.decrementAndGetPending();
088 }
089 else if (workflow.getStatus() == WorkflowJob.Status.RUNNING) {
090 // resume workflow job and update coord action accordingly
091 coordAction.setStatus(CoordinatorAction.Status.RUNNING);
092 coordAction.decrementAndGetPending();
093 }
094 else {
095 LOG.warn("Unexpected workflow " + workflow.getId() + " STATUS " + workflow.getStatus());
096 // update lastModifiedTime
097 coordAction.setLastModifiedTime(new Date());
098 updateList.add(coordAction);
099 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
100 // TODO - Uncomment this when bottom up rerun can change terminal state
101 /* CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
102 if (!coordJob.isPending()) {
103 coordJob.setPending();
104 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
105 }*/
106 return null;
107 }
108
109 LOG.info("Updating Coordintaor action id :" + coordAction.getId() + " status from " + preCoordStatus
110 + " to " + coordAction.getStatus() + ", pending = " + coordAction.getPending());
111
112 coordAction.setLastModifiedTime(new Date());
113 updateList.add(coordAction);
114 // TODO - Uncomment this when bottom up rerun can change terminal state
115 /*CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
116 if (!coordJob.isPending()) {
117 coordJob.setPending();
118 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
119 LOG.info("Updating Coordinator job "+ coordJob.getId() + "pending to true");
120 }*/
121 if (slaStatus != null) {
122 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus,
123 SlaAppType.COORDINATOR_ACTION, LOG);
124 if(slaEvent != null) {
125 insertList.add(slaEvent);
126 }
127 }
128 if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED
129 && workflow.getStatus() != WorkflowJob.Status.RUNNING) {
130 queue(new CoordActionReadyXCommand(coordAction.getJobId()));
131 }
132
133 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList));
134
135 LOG.debug("ENDED CoordActionUpdateXCommand for wfId=" + workflow.getId());
136 }
137 catch (XException ex) {
138 LOG.warn("CoordActionUpdate Failed ", ex.getMessage());
139 throw new CommandException(ex);
140 }
141 return null;
142 }
143
144 /* (non-Javadoc)
145 * @see org.apache.oozie.command.XCommand#getEntityKey()
146 */
147 @Override
148 public String getEntityKey() {
149 return coordAction.getJobId();
150 }
151
152 /* (non-Javadoc)
153 * @see org.apache.oozie.command.XCommand#isLockRequired()
154 */
155 @Override
156 protected boolean isLockRequired() {
157 return true;
158 }
159
160 /* (non-Javadoc)
161 * @see org.apache.oozie.command.XCommand#eagerLoadState()
162 */
163 @Override
164 protected void eagerLoadState() throws CommandException {
165 jpaService = Services.get().get(JPAService.class);
166 if (jpaService == null) {
167 throw new CommandException(ErrorCode.E0610);
168 }
169
170 int retries = 0;
171 while (retries++ < maxRetries) {
172 try {
173 coordAction = jpaService.execute(new CoordActionGetForExternalIdJPAExecutor(workflow.getId()));
174 if (coordAction != null) {
175 break;
176 }
177
178 if (retries < maxRetries) {
179 Thread.sleep(500);
180 }
181 }
182 catch (JPAExecutorException je) {
183 LOG.warn("Could not load coord action {0}", je.getMessage(), je);
184 }
185 catch (InterruptedException ex) {
186 LOG.warn("Retry to load coord action is interrupted {0}", ex.getMessage(), ex);
187 }
188 }
189
190 if (coordAction != null) {
191 LogUtils.setLogInfo(coordAction, logInfo);
192 }
193 }
194
195 /* (non-Javadoc)
196 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
197 */
198 @Override
199 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
200 if (coordAction == null) {
201 throw new PreconditionException(ErrorCode.E1100, ", coord action is null");
202 }
203 }
204
205 /* (non-Javadoc)
206 * @see org.apache.oozie.command.XCommand#loadState()
207 */
208 @Override
209 protected void loadState() throws CommandException {
210 }
211
212 /* (non-Javadoc)
213 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
214 */
215 @Override
216 protected void verifyPrecondition() throws CommandException, PreconditionException {
217
218 // if coord action is RUNNING and pending false and workflow is RUNNING, this doesn't need to be updated.
219 if (workflow.getStatus() == WorkflowJob.Status.RUNNING
220 && coordAction.getStatus() == CoordinatorAction.Status.RUNNING && !coordAction.isPending()) {
221 // update lastModifiedTime
222 coordAction.setLastModifiedTime(new Date());
223 try {
224 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor(coordAction));
225 }
226 catch (JPAExecutorException je) {
227 throw new CommandException(je);
228 }
229 throw new PreconditionException(ErrorCode.E1100, ", workflow is RUNNING and coordinator action is RUNNING and pending false");
230 }
231 }
232
233 }