This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.ArrayList;
023 import java.util.Date;
024 import java.util.HashSet;
025 import java.util.List;
026 import java.util.Set;
027
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.hadoop.fs.Path;
030 import org.apache.oozie.CoordinatorActionBean;
031 import org.apache.oozie.CoordinatorActionInfo;
032 import org.apache.oozie.CoordinatorJobBean;
033 import org.apache.oozie.ErrorCode;
034 import org.apache.oozie.XException;
035 import org.apache.oozie.action.ActionExecutorException;
036 import org.apache.oozie.action.hadoop.FsActionExecutor;
037 import org.apache.oozie.client.CoordinatorAction;
038 import org.apache.oozie.client.CoordinatorJob;
039 import org.apache.oozie.client.Job;
040 import org.apache.oozie.client.SLAEvent.SlaAppType;
041 import org.apache.oozie.client.rest.RestConstants;
042 import org.apache.oozie.command.CommandException;
043 import org.apache.oozie.command.PreconditionException;
044 import org.apache.oozie.command.RerunTransitionXCommand;
045 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
046 import org.apache.oozie.coord.CoordELFunctions;
047 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
048 import org.apache.oozie.executor.jpa.CoordJobGetActionForNominalTimeJPAExecutor;
049 import org.apache.oozie.executor.jpa.CoordJobGetActionsForDatesJPAExecutor;
050 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
051 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
052 import org.apache.oozie.executor.jpa.JPAExecutorException;
053 import org.apache.oozie.service.JPAService;
054 import org.apache.oozie.service.Services;
055 import org.apache.oozie.util.DateUtils;
056 import org.apache.oozie.util.InstrumentUtils;
057 import org.apache.oozie.util.LogUtils;
058 import org.apache.oozie.util.ParamChecker;
059 import org.apache.oozie.util.StatusUtils;
060 import org.apache.oozie.util.XConfiguration;
061 import org.apache.oozie.util.XLog;
062 import org.apache.oozie.util.XmlUtils;
063 import org.apache.oozie.util.db.SLADbOperations;
064 import org.jdom.Element;
065 import org.jdom.JDOMException;
066
067 /**
068 * Rerun coordinator actions by a list of dates or ids. User can specify if refresh or noCleanup.
069 * <p/>
070 * The "rerunType" can be set as {@link RestConstants.JOB_COORD_RERUN_DATE} or
071 * {@link RestConstants.JOB_COORD_RERUN_ACTION}.
072 * <p/>
073 * The "refresh" is used to indicate if user wants to refresh an action's input and output events.
074 * <p/>
075 * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions
076 */
077 public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActionInfo> {
078
079 private String rerunType;
080 private String scope;
081 private boolean refresh;
082 private boolean noCleanup;
083 private CoordinatorJobBean coordJob = null;
084 private JPAService jpaService = null;
085 protected boolean prevPending;
086
087 /**
088 * The constructor for class {@link CoordRerunXCommand}
089 *
090 * @param jobId the job id
091 * @param rerunType rerun type {@link RestConstants.JOB_COORD_RERUN_DATE} or {@link RestConstants.JOB_COORD_RERUN_ACTION}
092 * @param scope the rerun scope for given rerunType separated by ","
093 * @param refresh true if user wants to refresh input/output dataset urls
094 * @param noCleanup false if user wants to cleanup output events for given rerun actions
095 */
096 public CoordRerunXCommand(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) {
097 super("coord_rerun", "coord_rerun", 1);
098 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
099 this.rerunType = ParamChecker.notEmpty(rerunType, "rerunType");
100 this.scope = ParamChecker.notEmpty(scope, "scope");
101 this.refresh = refresh;
102 this.noCleanup = noCleanup;
103 }
104
105 /**
106 * Get the list of actions for given id ranges
107 *
108 * @param jobId coordinator job id
109 * @param scope the id range to rerun separated by ","
110 * @return the list of all actions to rerun
111 * @throws CommandException thrown if failed to get coordinator actions by given id range
112 */
113 private List<CoordinatorActionBean> getCoordActionsFromIds(String jobId, String scope) throws CommandException {
114 ParamChecker.notEmpty(jobId, "jobId");
115 ParamChecker.notEmpty(scope, "scope");
116
117 Set<String> actions = new HashSet<String>();
118 String[] list = scope.split(",");
119 for (String s : list) {
120 s = s.trim();
121 if (s.contains("-")) {
122 String[] range = s.split("-");
123 if (range.length != 2) {
124 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'");
125 }
126 int start;
127 int end;
128 try {
129 start = Integer.parseInt(range[0].trim());
130 end = Integer.parseInt(range[1].trim());
131 if (start > end) {
132 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'");
133 }
134 }
135 catch (NumberFormatException ne) {
136 throw new CommandException(ErrorCode.E0302, ne);
137 }
138 for (int i = start; i <= end; i++) {
139 actions.add(jobId + "@" + i);
140 }
141 }
142 else {
143 try {
144 Integer.parseInt(s);
145 }
146 catch (NumberFormatException ne) {
147 throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s
148 + "'. Integer only.");
149 }
150 actions.add(jobId + "@" + s);
151 }
152 }
153
154 List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>();
155 for (String id : actions) {
156 CoordinatorActionBean coordAction;
157 try {
158 coordAction = jpaService.execute(new CoordActionGetJPAExecutor(id));
159 }
160 catch (JPAExecutorException je) {
161 throw new CommandException(je);
162 }
163 coordActions.add(coordAction);
164 LOG.debug("Rerun coordinator for actionId='" + id + "'");
165 }
166 return coordActions;
167 }
168
169 /**
170 * Get the list of actions for given date ranges
171 *
172 * @param jobId coordinator job id
173 * @param scope the date range to rerun separated by ","
174 * @return the list of dates to rerun
175 * @throws CommandException thrown if failed to get coordinator actions by given date range
176 */
177 private List<CoordinatorActionBean> getCoordActionsFromDates(String jobId, String scope) throws CommandException {
178 ParamChecker.notEmpty(jobId, "jobId");
179 ParamChecker.notEmpty(scope, "scope");
180
181 Set<CoordinatorActionBean> actionSet = new HashSet<CoordinatorActionBean>();
182 String[] list = scope.split(",");
183 for (String s : list) {
184 s = s.trim();
185 if (s.contains("::")) {
186 String[] dateRange = s.split("::");
187 if (dateRange.length != 2) {
188 throw new CommandException(ErrorCode.E0302, "format is wrong for date's range '" + s + "'");
189 }
190 Date start;
191 Date end;
192 try {
193 start = DateUtils.parseDateUTC(dateRange[0].trim());
194 end = DateUtils.parseDateUTC(dateRange[1].trim());
195 if (start.after(end)) {
196 throw new CommandException(ErrorCode.E0302, "start date is older than end date: '" + s + "'");
197 }
198 }
199 catch (Exception e) {
200 throw new CommandException(ErrorCode.E0302, e);
201 }
202
203 List<CoordinatorActionBean> listOfActions = getActionIdsFromDateRange(jobId, start, end);
204 actionSet.addAll(listOfActions);
205 }
206 else {
207 try {
208 Date date = DateUtils.parseDateUTC(s.trim());
209 CoordinatorActionBean coordAction = jpaService
210 .execute(new CoordJobGetActionForNominalTimeJPAExecutor(jobId, date));
211 actionSet.add(coordAction);
212 }
213 catch (JPAExecutorException e) {
214 throw new CommandException(e);
215 }
216 catch (Exception e) {
217 throw new CommandException(ErrorCode.E0302, e);
218 }
219 }
220 }
221
222 List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>();
223 for (CoordinatorActionBean coordAction : actionSet) {
224 coordActions.add(coordAction);
225 LOG.debug("Rerun coordinator for actionId='" + coordAction.getId() + "'");
226 }
227 return coordActions;
228 }
229
230 /**
231 * Get coordinator action ids between given start and end time
232 *
233 * @param jobId coordinator job id
234 * @param start start time
235 * @param end end time
236 * @return a list of coordinator actions belong to the range of start and end time
237 * @throws CommandException thrown if failed to get coordinator actions
238 */
239 private List<CoordinatorActionBean> getActionIdsFromDateRange(String jobId, Date start, Date end)
240 throws CommandException {
241 List<CoordinatorActionBean> list;
242 try {
243 list = jpaService.execute(new CoordJobGetActionsForDatesJPAExecutor(jobId, start, end));
244 }
245 catch (JPAExecutorException je) {
246 throw new CommandException(je);
247 }
248 return list;
249 }
250
251 /**
252 * Check if all given actions are eligible to rerun.
253 *
254 * @param actions list of CoordinatorActionBean
255 * @return true if all actions are eligible to rerun
256 */
257 private boolean checkAllActionsRunnable(List<CoordinatorActionBean> coordActions) {
258 boolean ret = false;
259 for (CoordinatorActionBean coordAction : coordActions) {
260 ret = true;
261 if (!coordAction.isTerminalStatus()) {
262 ret = false;
263 break;
264 }
265 }
266 return ret;
267 }
268
269 /**
270 * Cleanup output-events directories
271 *
272 * @param eAction coordinator action xml
273 * @param user user name
274 * @param group group name
275 */
276 @SuppressWarnings("unchecked")
277 private void cleanupOutputEvents(Element eAction, String user, String group) {
278 Element outputList = eAction.getChild("output-events", eAction.getNamespace());
279 if (outputList != null) {
280 for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
281 if (data.getChild("uris", data.getNamespace()) != null) {
282 String uris = data.getChild("uris", data.getNamespace()).getTextTrim();
283 if (uris != null) {
284 String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR);
285 FsActionExecutor fsAe = new FsActionExecutor();
286 for (String uri : uriArr) {
287 Path path = new Path(uri);
288 try {
289 fsAe.delete(user, group, path);
290 LOG.debug("Cleanup the output dir " + path);
291 }
292 catch (ActionExecutorException ae) {
293 LOG.warn("Failed to cleanup the output dir " + uri, ae);
294 }
295 }
296 }
297
298 }
299 }
300 }
301 else {
302 LOG.info("No output-events defined in coordinator xml. Therefore nothing to cleanup");
303 }
304 }
305
306 /**
307 * Refresh an action's input and ouput events.
308 *
309 * @param coordJob coordinator job bean
310 * @param coordAction coordinator action bean
311 * @throws Exception thrown if failed to materialize coordinator action
312 */
313 private void refreshAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction) throws Exception {
314 Configuration jobConf = null;
315 try {
316 jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
317 }
318 catch (IOException ioe) {
319 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe);
320 throw new CommandException(ErrorCode.E1005, ioe);
321 }
322 String jobXml = coordJob.getJobXml();
323 Element eJob = XmlUtils.parseXml(jobXml);
324 Date actualTime = new Date();
325 String actionXml = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), coordAction
326 .getNominalTime(), actualTime, coordAction.getActionNumber(), jobConf, coordAction);
327 LOG.debug("Refresh Action actionId=" + coordAction.getId() + ", actionXml="
328 + XmlUtils.prettyPrint(actionXml).toString());
329 coordAction.setActionXml(actionXml);
330 }
331
332 /**
333 * Update an action into database table
334 *
335 * @param coordJob coordinator job bean
336 * @param coordAction coordinator action bean
337 * @param actionXml coordinator action xml
338 * @throws Exception thrown failed to update coordinator action bean or unable to write sla registration event
339 */
340 private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction, String actionXml)
341 throws Exception {
342 LOG.debug("updateAction for actionId=" + coordAction.getId());
343 if (coordAction.getStatus() == CoordinatorAction.Status.TIMEDOUT) {
344 LOG.debug("Updating created time for TIMEDOUT action id =" + coordAction.getId());
345 coordAction.setCreatedTime(new Date());
346 }
347 coordAction.setStatus(CoordinatorAction.Status.WAITING);
348 coordAction.setExternalId("");
349 coordAction.setExternalStatus("");
350 coordAction.setRerunTime(new Date());
351 coordAction.setLastModifiedTime(new Date());
352 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor(coordAction));
353 writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup());
354 }
355
356 /**
357 * Create SLA RegistrationEvent
358 *
359 * @param actionXml action xml
360 * @param actionBean coordinator action bean
361 * @param user user name
362 * @param group group name
363 * @throws Exception thrown if unable to write sla registration event
364 */
365 private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, String user, String group)
366 throws Exception {
367 Element eAction = XmlUtils.parseXml(actionXml);
368 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
369 SLADbOperations.writeSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, user, group,
370 LOG);
371 }
372
373 /* (non-Javadoc)
374 * @see org.apache.oozie.command.XCommand#getEntityKey()
375 */
376 @Override
377 protected String getEntityKey() {
378 return jobId;
379 }
380
381 /* (non-Javadoc)
382 * @see org.apache.oozie.command.XCommand#isLockRequired()
383 */
384 @Override
385 protected boolean isLockRequired() {
386 return true;
387 }
388
389 /* (non-Javadoc)
390 * @see org.apache.oozie.command.XCommand#loadState()
391 */
392 @Override
393 protected void loadState() throws CommandException {
394 jpaService = Services.get().get(JPAService.class);
395 if (jpaService == null) {
396 throw new CommandException(ErrorCode.E0610);
397 }
398 try {
399 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
400 prevPending = coordJob.isPending();
401 }
402 catch (JPAExecutorException je) {
403 throw new CommandException(je);
404 }
405 LogUtils.setLogInfo(coordJob, logInfo);
406 }
407
408 /* (non-Javadoc)
409 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
410 */
411 @Override
412 protected void verifyPrecondition() throws CommandException, PreconditionException {
413 if (coordJob.getStatus() == CoordinatorJob.Status.KILLED
414 || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
415 LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId);
416 throw new CommandException(ErrorCode.E1018,
417 "coordinator job is killed or failed so all actions are not eligible to rerun!");
418 }
419
420 // no actioins have been created for PREP job
421 if (coordJob.getStatus() == CoordinatorJob.Status.PREP) {
422 LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId);
423 throw new CommandException(ErrorCode.E1018,
424 "coordinator job is PREP so no actions are materialized to rerun!");
425 }
426 }
427
428 @Override
429 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
430 verifyPrecondition();
431 }
432
433 @Override
434 public void rerunChildren() throws CommandException {
435 boolean isError = false;
436 try {
437 CoordinatorActionInfo coordInfo = null;
438 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
439 List<CoordinatorActionBean> coordActions;
440 if (rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE)) {
441 coordActions = getCoordActionsFromDates(jobId, scope);
442 }
443 else if (rerunType.equals(RestConstants.JOB_COORD_RERUN_ACTION)) {
444 coordActions = getCoordActionsFromIds(jobId, scope);
445 }
446 else {
447 isError = true;
448 throw new CommandException(ErrorCode.E1018, "date or action expected.");
449 }
450 if (checkAllActionsRunnable(coordActions)) {
451 for (CoordinatorActionBean coordAction : coordActions) {
452 String actionXml = coordAction.getActionXml();
453 if (!noCleanup) {
454 Element eAction = XmlUtils.parseXml(actionXml);
455 cleanupOutputEvents(eAction, coordJob.getUser(), coordJob.getGroup());
456 }
457 if (refresh) {
458 refreshAction(coordJob, coordAction);
459 }
460 updateAction(coordJob, coordAction, actionXml);
461
462 queue(new CoordActionNotificationXCommand(coordAction), 100);
463 queue(new CoordActionInputCheckXCommand(coordAction.getId()), 100);
464 }
465 }
466 else {
467 isError = true;
468 throw new CommandException(ErrorCode.E1018, "part or all actions are not eligible to rerun!");
469 }
470 coordInfo = new CoordinatorActionInfo(coordActions);
471
472 ret = coordInfo;
473 }
474 catch (XException xex) {
475 isError = true;
476 throw new CommandException(xex);
477 }
478 catch (JDOMException jex) {
479 isError = true;
480 throw new CommandException(ErrorCode.E0700, jex);
481 }
482 catch (Exception ex) {
483 isError = true;
484 throw new CommandException(ErrorCode.E1018, ex);
485 }
486 finally{
487 if(isError){
488 transitToPrevious();
489 }
490 }
491 }
492
493 /*
494 * (non-Javadoc)
495 * @see org.apache.oozie.command.TransitionXCommand#getJob()
496 */
497 @Override
498 public Job getJob() {
499 return coordJob;
500 }
501
502 @Override
503 public void notifyParent() throws CommandException {
504 //update bundle action
505 if (getPrevStatus() != null && coordJob.getBundleId() != null) {
506 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, getPrevStatus());
507 bundleStatusUpdate.call();
508 }
509 }
510
511 @Override
512 public void updateJob() throws CommandException {
513 try {
514 // rerun a paused coordinator job will keep job status at paused and pending at previous pending
515 if (getPrevStatus()!= null && getPrevStatus().equals(Job.Status.PAUSED)) {
516 coordJob.setStatus(Job.Status.PAUSED);
517 if (prevPending) {
518 coordJob.setPending();
519 } else {
520 coordJob.resetPending();
521 }
522 }
523
524 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
525 }
526 catch (JPAExecutorException je) {
527 throw new CommandException(je);
528 }
529 }
530
531 /* (non-Javadoc)
532 * @see org.apache.oozie.command.RerunTransitionXCommand#getLog()
533 */
534 @Override
535 public XLog getLog() {
536 return LOG;
537 }
538
539 @Override
540 public final void transitToNext() {
541 prevStatus = coordJob.getStatus();
542 coordJob.setStatus(Job.Status.RUNNING);
543 // used for backward support of coordinator 0.1 schema
544 coordJob.setStatus(StatusUtils.getStatusForCoordRerun(coordJob, prevStatus));
545 coordJob.setPending();
546 }
547
548 private final void transitToPrevious() throws CommandException {
549 coordJob.setStatus(getPrevStatus());
550 if (!prevPending) {
551 coordJob.resetPending();
552 }
553 else {
554 coordJob.setPending();
555 }
556 }
557 }