This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.Date;
023 import java.util.List;
024 import org.apache.hadoop.conf.Configuration;
025 import org.apache.hadoop.fs.Path;
026 import org.apache.oozie.CoordinatorActionBean;
027 import org.apache.oozie.CoordinatorActionInfo;
028 import org.apache.oozie.CoordinatorJobBean;
029 import org.apache.oozie.ErrorCode;
030 import org.apache.oozie.SLAEventBean;
031 import org.apache.oozie.XException;
032 import org.apache.oozie.action.ActionExecutorException;
033 import org.apache.oozie.action.hadoop.FsActionExecutor;
034 import org.apache.oozie.client.CoordinatorAction;
035 import org.apache.oozie.client.CoordinatorJob;
036 import org.apache.oozie.client.Job;
037 import org.apache.oozie.client.SLAEvent.SlaAppType;
038 import org.apache.oozie.client.rest.RestConstants;
039 import org.apache.oozie.command.CommandException;
040 import org.apache.oozie.command.PreconditionException;
041 import org.apache.oozie.command.RerunTransitionXCommand;
042 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
043 import org.apache.oozie.coord.CoordELFunctions;
044 import org.apache.oozie.coord.CoordUtils;
045 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
046 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
047 import org.apache.oozie.executor.jpa.JPAExecutorException;
048 import org.apache.oozie.service.JPAService;
049 import org.apache.oozie.service.Services;
050 import org.apache.oozie.util.InstrumentUtils;
051 import org.apache.oozie.util.LogUtils;
052 import org.apache.oozie.util.ParamChecker;
053 import org.apache.oozie.util.StatusUtils;
054 import org.apache.oozie.util.XConfiguration;
055 import org.apache.oozie.util.XLog;
056 import org.apache.oozie.util.XmlUtils;
057 import org.apache.oozie.util.db.SLADbOperations;
058 import org.jdom.Element;
059 import org.jdom.JDOMException;
060
061 /**
062 * Rerun coordinator actions by a list of dates or ids. User can specify if refresh or noCleanup.
063 * <p/>
064 * The "rerunType" can be set as {@link RestConstants.JOB_COORD_RERUN_DATE} or
065 * {@link RestConstants.JOB_COORD_RERUN_ACTION}.
066 * <p/>
067 * The "refresh" is used to indicate if user wants to refresh an action's input and output events.
068 * <p/>
069 * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions
070 */
071 public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActionInfo> {
072
073 private String rerunType;
074 private String scope;
075 private boolean refresh;
076 private boolean noCleanup;
077 private CoordinatorJobBean coordJob = null;
078 private JPAService jpaService = null;
079 protected boolean prevPending;
080
081 /**
082 * The constructor for class {@link CoordRerunXCommand}
083 *
084 * @param jobId the job id
085 * @param rerunType rerun type {@link RestConstants.JOB_COORD_RERUN_DATE} or {@link RestConstants.JOB_COORD_RERUN_ACTION}
086 * @param scope the rerun scope for given rerunType separated by ","
087 * @param refresh true if user wants to refresh input/output dataset urls
088 * @param noCleanup false if user wants to cleanup output events for given rerun actions
089 */
090 public CoordRerunXCommand(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) {
091 super("coord_rerun", "coord_rerun", 1);
092 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
093 this.rerunType = ParamChecker.notEmpty(rerunType, "rerunType");
094 this.scope = ParamChecker.notEmpty(scope, "scope");
095 this.refresh = refresh;
096 this.noCleanup = noCleanup;
097 }
098
099 /**
100 * Check if all given actions are eligible to rerun.
101 *
102 * @param actions list of CoordinatorActionBean
103 * @return true if all actions are eligible to rerun
104 */
105 private static boolean checkAllActionsRunnable(List<CoordinatorActionBean> coordActions) {
106 ParamChecker.notNull(coordActions, "Coord actions to be rerun");
107 boolean ret = false;
108 for (CoordinatorActionBean coordAction : coordActions) {
109 ret = true;
110 if (!coordAction.isTerminalStatus()) {
111 ret = false;
112 break;
113 }
114 }
115 return ret;
116 }
117
118 /**
119 * Get the list of actions for a given coordinator job
120 * @param rerunType the rerun type (date, action)
121 * @param jobId the coordinator job id
122 * @param scope the date scope or action id scope
123 * @return the list of Coordinator actions
124 * @throws CommandException
125 */
126 public static List<CoordinatorActionBean> getCoordActions(String rerunType, String jobId, String scope) throws CommandException{
127 List<CoordinatorActionBean> coordActions = null;
128 if (rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE)) {
129 coordActions = CoordUtils.getCoordActionsFromDates(jobId, scope);
130 }
131 else if (rerunType.equals(RestConstants.JOB_COORD_RERUN_ACTION)) {
132 coordActions = CoordUtils.getCoordActionsFromIds(jobId, scope);
133 }
134 return coordActions;
135 }
136
137 /**
138 * Cleanup output-events directories
139 *
140 * @param eAction coordinator action xml
141 * @param user user name
142 * @param group group name
143 */
144 @SuppressWarnings("unchecked")
145 private void cleanupOutputEvents(Element eAction, String user, String group) {
146 Element outputList = eAction.getChild("output-events", eAction.getNamespace());
147 if (outputList != null) {
148 for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
149 if (data.getChild("uris", data.getNamespace()) != null) {
150 String uris = data.getChild("uris", data.getNamespace()).getTextTrim();
151 if (uris != null) {
152 String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR);
153 FsActionExecutor fsAe = new FsActionExecutor();
154 for (String uri : uriArr) {
155 Path path = new Path(uri);
156 try {
157 fsAe.delete(user, group, path);
158 LOG.debug("Cleanup the output dir " + path);
159 }
160 catch (ActionExecutorException ae) {
161 LOG.warn("Failed to cleanup the output dir " + uri, ae);
162 }
163 }
164 }
165
166 }
167 }
168 }
169 else {
170 LOG.info("No output-events defined in coordinator xml. Therefore nothing to cleanup");
171 }
172 }
173
174 /**
175 * Refresh an action's input and ouput events.
176 *
177 * @param coordJob coordinator job bean
178 * @param coordAction coordinator action bean
179 * @throws Exception thrown if failed to materialize coordinator action
180 */
181 private void refreshAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction) throws Exception {
182 Configuration jobConf = null;
183 try {
184 jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
185 }
186 catch (IOException ioe) {
187 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe);
188 throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe);
189 }
190 String jobXml = coordJob.getJobXml();
191 Element eJob = XmlUtils.parseXml(jobXml);
192 Date actualTime = new Date();
193 String actionXml = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), coordAction
194 .getNominalTime(), actualTime, coordAction.getActionNumber(), jobConf, coordAction);
195 LOG.debug("Refresh Action actionId=" + coordAction.getId() + ", actionXml="
196 + XmlUtils.prettyPrint(actionXml).toString());
197 coordAction.setActionXml(actionXml);
198 }
199
200 /**
201 * Update an action into database table
202 *
203 * @param coordJob coordinator job bean
204 * @param coordAction coordinator action bean
205 * @param actionXml coordinator action xml
206 * @throws Exception thrown failed to update coordinator action bean or unable to write sla registration event
207 */
208 private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction, String actionXml)
209 throws Exception {
210 LOG.debug("updateAction for actionId=" + coordAction.getId());
211 if (coordAction.getStatus() == CoordinatorAction.Status.TIMEDOUT) {
212 LOG.debug("Updating created time for TIMEDOUT action id =" + coordAction.getId());
213 coordAction.setCreatedTime(new Date());
214 }
215 coordAction.setStatus(CoordinatorAction.Status.WAITING);
216 coordAction.setExternalId("");
217 coordAction.setExternalStatus("");
218 coordAction.setRerunTime(new Date());
219 coordAction.setLastModifiedTime(new Date());
220 updateList.add(coordAction);
221 writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup());
222 }
223
224 /**
225 * Create SLA RegistrationEvent
226 *
227 * @param actionXml action xml
228 * @param actionBean coordinator action bean
229 * @param user user name
230 * @param group group name
231 * @throws Exception thrown if unable to write sla registration event
232 */
233 private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, String user, String group)
234 throws Exception {
235 Element eAction = XmlUtils.parseXml(actionXml);
236 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
237 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(),
238 SlaAppType.COORDINATOR_ACTION, user, group, LOG);
239 if(slaEvent != null) {
240 insertList.add(slaEvent);
241 }
242 }
243
244 /* (non-Javadoc)
245 * @see org.apache.oozie.command.XCommand#getEntityKey()
246 */
247 @Override
248 public String getEntityKey() {
249 return jobId;
250 }
251
252 /* (non-Javadoc)
253 * @see org.apache.oozie.command.XCommand#isLockRequired()
254 */
255 @Override
256 protected boolean isLockRequired() {
257 return true;
258 }
259
260 /* (non-Javadoc)
261 * @see org.apache.oozie.command.XCommand#loadState()
262 */
263 @Override
264 protected void loadState() throws CommandException {
265 jpaService = Services.get().get(JPAService.class);
266 if (jpaService == null) {
267 throw new CommandException(ErrorCode.E0610);
268 }
269 try {
270 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
271 prevPending = coordJob.isPending();
272 }
273 catch (JPAExecutorException je) {
274 throw new CommandException(je);
275 }
276 LogUtils.setLogInfo(coordJob, logInfo);
277 }
278
279 /* (non-Javadoc)
280 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
281 */
282 @Override
283 protected void verifyPrecondition() throws CommandException, PreconditionException {
284 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, coordJob.getStatus());
285 if (coordJob.getStatus() == CoordinatorJob.Status.KILLED
286 || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
287 LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId);
288 // Call the parent so the pending flag is reset and state transition
289 // of bundle can happen
290 if (coordJob.getBundleId() != null) {
291 bundleStatusUpdate.call();
292 }
293 throw new CommandException(ErrorCode.E1018,
294 "coordinator job is killed or failed so all actions are not eligible to rerun!");
295 }
296
297 // no actioins have been created for PREP job
298 if (coordJob.getStatus() == CoordinatorJob.Status.PREP) {
299 LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId);
300 // Call the parent so the pending flag is reset and state transition
301 // of bundle can happen
302 if (coordJob.getBundleId() != null) {
303 bundleStatusUpdate.call();
304 }
305 throw new CommandException(ErrorCode.E1018,
306 "coordinator job is PREP so no actions are materialized to rerun!");
307 }
308 }
309
310 @Override
311 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
312 verifyPrecondition();
313 }
314
315 @Override
316 public void rerunChildren() throws CommandException {
317 boolean isError = false;
318 try {
319 CoordinatorActionInfo coordInfo = null;
320 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
321 List<CoordinatorActionBean> coordActions = getCoordActions(rerunType, jobId, scope);
322 if (checkAllActionsRunnable(coordActions)) {
323 for (CoordinatorActionBean coordAction : coordActions) {
324 String actionXml = coordAction.getActionXml();
325 if (!noCleanup) {
326 Element eAction = XmlUtils.parseXml(actionXml);
327 cleanupOutputEvents(eAction, coordJob.getUser(), coordJob.getGroup());
328 }
329 if (refresh) {
330 refreshAction(coordJob, coordAction);
331 }
332 updateAction(coordJob, coordAction, actionXml);
333
334 queue(new CoordActionNotificationXCommand(coordAction), 100);
335 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 100);
336 }
337 }
338 else {
339 isError = true;
340 throw new CommandException(ErrorCode.E1018, "part or all actions are not eligible to rerun!");
341 }
342 coordInfo = new CoordinatorActionInfo(coordActions);
343
344 ret = coordInfo;
345 }
346 catch (XException xex) {
347 isError = true;
348 throw new CommandException(xex);
349 }
350 catch (JDOMException jex) {
351 isError = true;
352 throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex);
353 }
354 catch (Exception ex) {
355 isError = true;
356 throw new CommandException(ErrorCode.E1018, ex.getMessage(), ex);
357 }
358 finally{
359 if(isError){
360 transitToPrevious();
361 }
362 }
363 }
364
365 /*
366 * (non-Javadoc)
367 * @see org.apache.oozie.command.TransitionXCommand#getJob()
368 */
369 @Override
370 public Job getJob() {
371 return coordJob;
372 }
373
374 @Override
375 public void notifyParent() throws CommandException {
376 //update bundle action
377 if (getPrevStatus() != null && coordJob.getBundleId() != null) {
378 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, getPrevStatus());
379 bundleStatusUpdate.call();
380 }
381 }
382
383 @Override
384 public void updateJob() {
385 if (getPrevStatus()!= null){
386 Job.Status coordJobStatus = getPrevStatus();
387 if(coordJobStatus.equals(Job.Status.PAUSED) || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
388 coordJob.setStatus(coordJobStatus);
389 }
390 if (prevPending) {
391 coordJob.setPending();
392 } else {
393 coordJob.resetPending();
394 }
395 }
396
397 updateList.add(coordJob);
398 }
399
400 /* (non-Javadoc)
401 * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites()
402 */
403 @Override
404 public void performWrites() throws CommandException {
405 try {
406 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
407 }
408 catch (JPAExecutorException e) {
409 throw new CommandException(e);
410 }
411 }
412
413 /* (non-Javadoc)
414 * @see org.apache.oozie.command.RerunTransitionXCommand#getLog()
415 */
416 @Override
417 public XLog getLog() {
418 return LOG;
419 }
420
421 @Override
422 public final void transitToNext() {
423 prevStatus = coordJob.getStatus();
424 if (prevStatus == CoordinatorJob.Status.SUCCEEDED || prevStatus == CoordinatorJob.Status.PAUSED
425 || prevStatus == CoordinatorJob.Status.SUSPENDED || prevStatus == CoordinatorJob.Status.RUNNING) {
426 coordJob.setStatus(Job.Status.RUNNING);
427 }
428 else {
429 // Check for backward compatibility for Oozie versions (3.2 and before)
430 // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
431 // PAUSEDWITHERROR is not supported
432 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
433 }
434 // used for backward support of coordinator 0.1 schema
435 coordJob.setStatus(StatusUtils.getStatusForCoordRerun(coordJob, prevStatus));
436 coordJob.setPending();
437 }
438
439 private final void transitToPrevious() throws CommandException {
440 coordJob.setStatus(getPrevStatus());
441 if (!prevPending) {
442 coordJob.resetPending();
443 }
444 else {
445 coordJob.setPending();
446 }
447 }
448 }