001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.oozie.AppType;
022import org.apache.oozie.CoordinatorActionBean;
023import org.apache.oozie.CoordinatorJobBean;
024import org.apache.oozie.ErrorCode;
025import org.apache.oozie.SLAEventBean;
026import org.apache.oozie.client.CoordinatorJob;
027import org.apache.oozie.client.Job;
028import org.apache.oozie.client.SLAEvent.SlaAppType;
029import org.apache.oozie.client.rest.JsonBean;
030import org.apache.oozie.command.CommandException;
031import org.apache.oozie.command.MaterializeTransitionXCommand;
032import org.apache.oozie.command.PreconditionException;
033import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
034import org.apache.oozie.coord.TimeUnit;
035import org.apache.oozie.executor.jpa.BatchQueryExecutor;
036import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
037import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
038import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
039import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
040import org.apache.oozie.executor.jpa.JPAExecutorException;
041import org.apache.oozie.service.CoordMaterializeTriggerService;
042import org.apache.oozie.service.EventHandlerService;
043import org.apache.oozie.service.JPAService;
044import org.apache.oozie.service.Service;
045import org.apache.oozie.service.Services;
046import org.apache.oozie.sla.SLAOperations;
047import org.apache.oozie.util.DateUtils;
048import org.apache.oozie.util.Instrumentation;
049import org.apache.oozie.util.LogUtils;
050import org.apache.oozie.util.ParamChecker;
051import org.apache.oozie.util.StatusUtils;
052import org.apache.oozie.util.XConfiguration;
053import org.apache.oozie.util.XmlUtils;
054import org.apache.oozie.util.db.SLADbOperations;
055import org.jdom.Element;
056import org.jdom.JDOMException;
057
058import java.io.IOException;
059import java.io.StringReader;
060import java.sql.Timestamp;
061import java.util.Calendar;
062import java.util.Date;
063import java.util.TimeZone;
064
065/**
066 * Materialize actions for specified start and end time for coordinator job.
067 */
068@SuppressWarnings("deprecation")
069public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCommand {
070
071    private JPAService jpaService = null;
072    private CoordinatorJobBean coordJob = null;
073    private String jobId = null;
074    private Date startMatdTime = null;
075    private Date endMatdTime = null;
076    private final int materializationWindow;
077    private int lastActionNumber = 1; // over-ride by DB value
078    private CoordinatorJob.Status prevStatus = null;
079
080    static final private int lookAheadWindow = Services
081            .get()
082            .getConf()
083            .getInt(CoordMaterializeTriggerService.CONF_LOOKUP_INTERVAL,
084                    CoordMaterializeTriggerService.CONF_LOOKUP_INTERVAL_DEFAULT);
085
086    /**
087     * Default MAX timeout in minutes, after which coordinator input check will timeout
088     */
089    public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
090
091    /**
092     * The constructor for class {@link CoordMaterializeTransitionXCommand}
093     *
094     * @param jobId coordinator job id
095     * @param materializationWindow materialization window to calculate end time
096     * @param lookahead window
097     */
098    public CoordMaterializeTransitionXCommand(String jobId, int materializationWindow) {
099        super("coord_mater", "coord_mater", 1);
100        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
101        this.materializationWindow = materializationWindow;
102    }
103
104    /* (non-Javadoc)
105     * @see org.apache.oozie.command.MaterializeTransitionXCommand#transitToNext()
106     */
107    @Override
108    public void transitToNext() throws CommandException {
109    }
110
111    /* (non-Javadoc)
112     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
113     */
114    @Override
115    public void updateJob() throws CommandException {
116        updateList.add(new UpdateEntry(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE,coordJob));
117    }
118
119    /* (non-Javadoc)
120     * @see org.apache.oozie.command.MaterializeTransitionXCommand#performWrites()
121     */
122    @Override
123    public void performWrites() throws CommandException {
124        try {
125            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
126            // register the partition related dependencies of actions
127            for (JsonBean actionBean : insertList) {
128                if (actionBean instanceof CoordinatorActionBean) {
129                    CoordinatorActionBean coordAction = (CoordinatorActionBean) actionBean;
130                    if (EventHandlerService.isEnabled()) {
131                        CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
132                    }
133
134                    // TODO: time 100s should be configurable
135                    queue(new CoordActionNotificationXCommand(coordAction), 100);
136
137                    //Delay for input check = (nominal time - now)
138                    long checkDelay = coordAction.getNominalTime().getTime() - new Date().getTime();
139                    queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
140                        Math.max(checkDelay, 0));
141
142                    if (coordAction.getPushMissingDependencies() != null) {
143                        // TODO: Delay in catchup mode?
144                        queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), 100);
145                    }
146                }
147            }
148        }
149        catch (JPAExecutorException jex) {
150            throw new CommandException(jex);
151        }
152    }
153
154    /* (non-Javadoc)
155     * @see org.apache.oozie.command.XCommand#getEntityKey()
156     */
157    @Override
158    public String getEntityKey() {
159        return this.jobId;
160    }
161
162    @Override
163    protected boolean isLockRequired() {
164        return true;
165    }
166
167    /* (non-Javadoc)
168     * @see org.apache.oozie.command.XCommand#loadState()
169     */
170    @Override
171    protected void loadState() throws CommandException {
172        jpaService = Services.get().get(JPAService.class);
173        if (jpaService == null) {
174            LOG.error(ErrorCode.E0610);
175        }
176
177        try {
178            coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_MATERIALIZE, jobId);
179            prevStatus = coordJob.getStatus();
180        }
181        catch (JPAExecutorException jex) {
182            throw new CommandException(jex);
183        }
184
185        // calculate start materialize and end materialize time
186        calcMatdTime();
187
188        LogUtils.setLogInfo(coordJob, logInfo);
189    }
190
191    /**
192     * Calculate startMatdTime and endMatdTime from job's start time if next materialized time is null
193     *
194     * @throws CommandException thrown if failed to calculate startMatdTime and endMatdTime
195     */
196    protected void calcMatdTime() throws CommandException {
197        Timestamp startTime = coordJob.getNextMaterializedTimestamp();
198        if (startTime == null) {
199            startTime = coordJob.getStartTimestamp();
200        }
201        // calculate end time by adding materializationWindow to start time.
202        // need to convert materializationWindow from secs to milliseconds
203        long startTimeMilli = startTime.getTime();
204        long endTimeMilli = startTimeMilli + (materializationWindow * 1000);
205
206        startMatdTime = DateUtils.toDate(new Timestamp(startTimeMilli));
207        endMatdTime = DateUtils.toDate(new Timestamp(endTimeMilli));
208        endMatdTime = getMaterializationTimeForCatchUp(endMatdTime);
209        // if MaterializationWindow end time is greater than endTime
210        // for job, then set it to endTime of job
211        Date jobEndTime = coordJob.getEndTime();
212        if (endMatdTime.compareTo(jobEndTime) > 0) {
213            endMatdTime = jobEndTime;
214        }
215
216        LOG.debug("Materializing coord job id=" + jobId + ", start=" + DateUtils.formatDateOozieTZ(startMatdTime) + ", end=" + DateUtils.formatDateOozieTZ(endMatdTime)
217                + ", window=" + materializationWindow);
218    }
219
220    /**
221     * Get materialization for window for catch-up jobs. for current jobs,it reruns currentMatdate, For catch-up, end
222     * Mataterilized Time = startMatdTime + MatThrottling * frequency; unless LAST_ONLY execution order is set, in which
223     * case it returns now (to materialize all actions in the past)
224     *
225     * @param currentMatTime
226     * @return
227     * @throws CommandException
228     * @throws JDOMException
229     */
230    private Date getMaterializationTimeForCatchUp(Date currentMatTime) throws CommandException {
231        if (currentMatTime.after(new Date())) {
232            return currentMatTime;
233        }
234        if (coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.LAST_ONLY) ||
235                coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.NONE)) {
236            return new Date();
237        }
238        int frequency = 0;
239        try {
240            frequency = Integer.parseInt(coordJob.getFrequency());
241        }
242        catch (NumberFormatException e) {
243            return currentMatTime;
244        }
245
246        TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone());
247        TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr());
248        Calendar startInstance = Calendar.getInstance(appTz);
249        startInstance.setTime(startMatdTime);
250        Calendar endMatInstance = null;
251        Calendar previousInstance = startInstance;
252        for (int i = 1; i <= coordJob.getMatThrottling(); i++) {
253            endMatInstance = (Calendar) startInstance.clone();
254            endMatInstance.add(freqTU.getCalendarUnit(), i * frequency);
255            if (endMatInstance.getTime().compareTo(new Date()) >= 0) {
256                if (previousInstance.getTime().after(currentMatTime)) {
257                    return previousInstance.getTime();
258                }
259                else {
260                    return currentMatTime;
261                }
262            }
263            previousInstance = endMatInstance;
264        }
265        if (endMatInstance == null) {
266            return currentMatTime;
267        }
268        else {
269            return endMatInstance.getTime();
270        }
271    }
272
273    /* (non-Javadoc)
274     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
275     */
276    @Override
277    protected void verifyPrecondition() throws CommandException, PreconditionException {
278        if (!(coordJob.getStatus() == CoordinatorJobBean.Status.PREP || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNING
279                || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNINGWITHERROR)) {
280            throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId
281                    + " job is not in PREP or RUNNING but in " + coordJob.getStatus());
282        }
283
284        if (coordJob.isDoneMaterialization()) {
285            throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId =" + jobId
286                    + " job is already materialized");
287        }
288
289        if (coordJob.getNextMaterializedTimestamp() != null
290                && coordJob.getNextMaterializedTimestamp().compareTo(coordJob.getEndTimestamp()) >= 0) {
291            throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId
292                    + " job is already materialized");
293        }
294
295        Timestamp startTime = coordJob.getNextMaterializedTimestamp();
296        if (startTime == null) {
297            startTime = coordJob.getStartTimestamp();
298
299            if (startTime.after(new Timestamp(System.currentTimeMillis() + lookAheadWindow * 1000))) {
300                throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId="
301                        + jobId + " job's start time is not reached yet - nothing to materialize");
302            }
303        }
304
305        if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(coordJob.getEndTime()) >= 0) {
306            throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
307                    + ", all actions have been materialized from start time = " + coordJob.getStartTime()
308                    + " to end time = " + coordJob.getEndTime() + ", job status = " + coordJob.getStatusStr());
309        }
310
311        if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(endMatdTime) >= 0) {
312            throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
313                    + ", action is *already* materialized for Materialization start time = " + startMatdTime
314                    + ", materialization end time = " + endMatdTime + ", job status = " + coordJob.getStatusStr());
315        }
316
317        if (endMatdTime.after(coordJob.getEndTime())) {
318            throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
319                    + " materialization end time = " + endMatdTime + " surpasses coordinator job's end time = "
320                    + coordJob.getEndTime() + " job status = " + coordJob.getStatusStr());
321        }
322
323        if (coordJob.getPauseTime() != null && !startMatdTime.before(coordJob.getPauseTime())) {
324            throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
325                    + ", materialization start time = " + startMatdTime
326                    + " is after or equal to coordinator job's pause time = " + coordJob.getPauseTime()
327                    + ", job status = " + coordJob.getStatusStr());
328        }
329
330    }
331
332    /* (non-Javadoc)
333     * @see org.apache.oozie.command.MaterializeTransitionXCommand#materialize()
334     */
335    @Override
336    protected void materialize() throws CommandException {
337        Instrumentation.Cron cron = new Instrumentation.Cron();
338        cron.start();
339        try {
340            materializeActions(false);
341            updateJobMaterializeInfo(coordJob);
342        }
343        catch (CommandException ex) {
344            LOG.warn("Exception occurred:" + ex.getMessage() + " Making the job failed ", ex);
345            coordJob.setStatus(Job.Status.FAILED);
346            coordJob.resetPending();
347            // remove any materialized actions and slaEvents
348            insertList.clear();
349        }
350        catch (Exception e) {
351            LOG.error("Exception occurred:" + e.getMessage() + " Making the job failed ", e);
352            coordJob.setStatus(Job.Status.FAILED);
353            try {
354                CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE, coordJob);
355            }
356            catch (JPAExecutorException jex) {
357                throw new CommandException(ErrorCode.E1011, jex);
358            }
359            throw new CommandException(ErrorCode.E1012, e.getMessage(), e);
360        } finally {
361            cron.stop();
362            instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".materialize", cron);
363        }
364    }
365
366    /**
367     * Create action instances starting from "startMatdTime" to "endMatdTime" and store them into coord action table.
368     *
369     * @param dryrun if this is a dry run
370     * @throws Exception thrown if failed to materialize actions
371     */
372    protected String materializeActions(boolean dryrun) throws Exception {
373
374        Configuration jobConf = null;
375        try {
376            jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
377        }
378        catch (IOException ioe) {
379            LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe);
380            throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe);
381        }
382
383        String jobXml = coordJob.getJobXml();
384        Element eJob = XmlUtils.parseXml(jobXml);
385        TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone());
386
387        String frequency = coordJob.getFrequency();
388        TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr());
389        TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
390        Calendar start = Calendar.getInstance(appTz);
391        start.setTime(startMatdTime);
392        DateUtils.moveToEnd(start, endOfFlag);
393        Calendar end = Calendar.getInstance(appTz);
394        end.setTime(endMatdTime);
395        lastActionNumber = coordJob.getLastActionNumber();
396        //Intentionally printing dates in their own timezone, not Oozie timezone
397        LOG.info("materialize actions for tz=" + appTz.getDisplayName() + ",\n start=" + start.getTime() + ", end="
398                + end.getTime() + ",\n timeUnit " + freqTU.getCalendarUnit() + ",\n frequency :" + frequency + ":"
399                + freqTU + ",\n lastActionNumber " + lastActionNumber);
400        // Keep the actual start time
401        Calendar origStart = Calendar.getInstance(appTz);
402        origStart.setTime(coordJob.getStartTimestamp());
403        // Move to the End of duration, if needed.
404        DateUtils.moveToEnd(origStart, endOfFlag);
405
406        StringBuilder actionStrings = new StringBuilder();
407        Date jobPauseTime = coordJob.getPauseTime();
408        Calendar pause = null;
409        if (jobPauseTime != null) {
410            pause = Calendar.getInstance(appTz);
411            pause.setTime(DateUtils.convertDateToTimestamp(jobPauseTime));
412        }
413
414        String action = null;
415        int numWaitingActions = jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
416        int maxActionToBeCreated = coordJob.getMatThrottling() - numWaitingActions;
417        // If LAST_ONLY and all materialization is in the past, ignore maxActionsToBeCreated
418        boolean ignoreMaxActions =
419                (coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.LAST_ONLY) ||
420                        coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.NONE))
421                        && endMatdTime.before(new Date());
422        LOG.debug("Coordinator job :" + coordJob.getId() + ", maxActionToBeCreated :" + maxActionToBeCreated
423                + ", Mat_Throttle :" + coordJob.getMatThrottling() + ", numWaitingActions :" + numWaitingActions);
424
425        boolean isCronFrequency = false;
426
427        Calendar effStart = (Calendar) start.clone();
428        try {
429            int intFrequency = Integer.parseInt(coordJob.getFrequency());
430            effStart = (Calendar) origStart.clone();
431            effStart.add(freqTU.getCalendarUnit(), lastActionNumber * intFrequency);
432        }
433        catch (NumberFormatException e) {
434            isCronFrequency = true;
435        }
436
437        boolean firstMater = true;
438        while (effStart.compareTo(end) < 0 && (ignoreMaxActions || maxActionToBeCreated-- > 0)) {
439            if (pause != null && effStart.compareTo(pause) >= 0) {
440                break;
441            }
442
443            Date nextTime = effStart.getTime();
444
445            if (isCronFrequency) {
446                if (effStart.getTime().compareTo(startMatdTime) == 0 && firstMater) {
447                    effStart.add(Calendar.MINUTE, -1);
448                    firstMater = false;
449                }
450
451                nextTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(effStart.getTime(), coordJob);
452                effStart.setTime(nextTime);
453            }
454
455            if (effStart.compareTo(end) < 0) {
456
457                if (pause != null && effStart.compareTo(pause) >= 0) {
458                    break;
459                }
460                CoordinatorActionBean actionBean = new CoordinatorActionBean();
461                lastActionNumber++;
462
463                int timeout = coordJob.getTimeout();
464                LOG.debug("Materializing action for time=" + DateUtils.formatDateOozieTZ(effStart.getTime())
465                        + ", lastactionnumber=" + lastActionNumber + " timeout=" + timeout + " minutes");
466                Date actualTime = new Date();
467                action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(),
468                        nextTime, actualTime, lastActionNumber, jobConf, actionBean);
469                actionBean.setTimeOut(timeout);
470
471                if (!dryrun) {
472                    storeToDB(actionBean, action); // Storing to table
473
474                }
475                else {
476                    actionStrings.append("action for new instance");
477                    actionStrings.append(action);
478                }
479            }
480            else {
481                break;
482            }
483
484            if (!isCronFrequency) {
485                effStart = (Calendar) origStart.clone();
486                effStart.add(freqTU.getCalendarUnit(), lastActionNumber * Integer.parseInt(coordJob.getFrequency()));
487            }
488        }
489
490        if (isCronFrequency) {
491            if (effStart.compareTo(end) < 0 && !(ignoreMaxActions || maxActionToBeCreated-- > 0)) {
492                //Since we exceed the throttle, we need to move the nextMadtime forward
493                //to avoid creating duplicate actions
494                if (!firstMater) {
495                    effStart.setTime(CoordCommandUtils.getNextValidActionTimeForCronFrequency(effStart.getTime(), coordJob));
496                }
497            }
498        }
499
500        endMatdTime = effStart.getTime();
501
502        if (!dryrun) {
503            return action;
504        }
505        else {
506            return actionStrings.toString();
507        }
508    }
509
510    private void storeToDB(CoordinatorActionBean actionBean, String actionXml) throws Exception {
511        LOG.debug("In storeToDB() coord action id = " + actionBean.getId() + ", size of actionXml = "
512                + actionXml.length());
513        actionBean.setActionXml(actionXml);
514
515        insertList.add(actionBean);
516        writeActionSlaRegistration(actionXml, actionBean);
517    }
518
519    private void writeActionSlaRegistration(String actionXml, CoordinatorActionBean actionBean) throws Exception {
520        Element eAction = XmlUtils.parseXml(actionXml);
521        Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
522        SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, coordJob
523                .getUser(), coordJob.getGroup(), LOG);
524        if(slaEvent != null) {
525            insertList.add(slaEvent);
526        }
527        // inserting into new table also
528        SLAOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), actionBean.getJobId(),
529                AppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getAppName(), LOG, false);
530    }
531
532    private void updateJobMaterializeInfo(CoordinatorJobBean job) throws CommandException {
533        job.setLastActionTime(endMatdTime);
534        job.setLastActionNumber(lastActionNumber);
535        // if the job endtime == action endtime, we don't need to materialize this job anymore
536        Date jobEndTime = job.getEndTime();
537
538
539        if (job.getStatus() == CoordinatorJob.Status.PREP){
540            LOG.info("[" + job.getId() + "]: Update status from " + job.getStatus() + " to RUNNING");
541            job.setStatus(Job.Status.RUNNING);
542        }
543        job.setPending();
544
545        if (jobEndTime.compareTo(endMatdTime) <= 0) {
546            LOG.info("[" + job.getId() + "]: all actions have been materialized, set pending to true");
547            // set doneMaterialization to true when materialization is done
548            job.setDoneMaterialization();
549        }
550        job.setStatus(StatusUtils.getStatus(job));
551        LOG.info("Coord Job status updated to = " + job.getStatus());
552        job.setNextMaterializedTime(endMatdTime);
553    }
554
555    /* (non-Javadoc)
556     * @see org.apache.oozie.command.XCommand#getKey()
557     */
558    @Override
559    public String getKey() {
560        return getName() + "_" + jobId;
561    }
562
563    /* (non-Javadoc)
564     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
565     */
566    @Override
567    public void notifyParent() throws CommandException {
568        // update bundle action only when status changes in coord job
569        if (this.coordJob.getBundleId() != null) {
570            if (!prevStatus.equals(coordJob.getStatus())) {
571                BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
572                bundleStatusUpdate.call();
573            }
574        }
575    }
576}