001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.coord;
020
021import org.apache.commons.lang.StringUtils;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.oozie.AppType;
024import org.apache.oozie.CoordinatorActionBean;
025import org.apache.oozie.CoordinatorJobBean;
026import org.apache.oozie.ErrorCode;
027import org.apache.oozie.SLAEventBean;
028import org.apache.oozie.client.CoordinatorJob;
029import org.apache.oozie.client.Job;
030import org.apache.oozie.client.SLAEvent.SlaAppType;
031import org.apache.oozie.client.rest.JsonBean;
032import org.apache.oozie.command.CommandException;
033import org.apache.oozie.command.MaterializeTransitionXCommand;
034import org.apache.oozie.command.PreconditionException;
035import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
036import org.apache.oozie.coord.CoordUtils;
037import org.apache.oozie.coord.TimeUnit;
038import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil;
039import org.apache.oozie.executor.jpa.BatchQueryExecutor;
040import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
041import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
042import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
043import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
044import org.apache.oozie.executor.jpa.JPAExecutorException;
045import org.apache.oozie.service.ConfigurationService;
046import org.apache.oozie.service.CoordMaterializeTriggerService;
047import org.apache.oozie.service.EventHandlerService;
048import org.apache.oozie.service.JPAService;
049import org.apache.oozie.service.Service;
050import org.apache.oozie.service.Services;
051import org.apache.oozie.sla.SLAOperations;
052import org.apache.oozie.util.DateUtils;
053import org.apache.oozie.util.Instrumentation;
054import org.apache.oozie.util.LogUtils;
055import org.apache.oozie.util.ParamChecker;
056import org.apache.oozie.util.StatusUtils;
057import org.apache.oozie.util.XConfiguration;
058import org.apache.oozie.util.XmlUtils;
059import org.apache.oozie.util.db.SLADbOperations;
060import org.jdom.Element;
061import org.jdom.JDOMException;
062
063import java.io.IOException;
064import java.io.StringReader;
065import java.sql.Timestamp;
066import java.util.Calendar;
067import java.util.Date;
068import java.util.TimeZone;
069
070/**
071 * Materialize actions for specified start and end time for coordinator job.
072 */
073@SuppressWarnings("deprecation")
074public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCommand {
075
076    private JPAService jpaService = null;
077    private CoordinatorJobBean coordJob = null;
078    private String jobId = null;
079    private Date startMatdTime = null;
080    private Date endMatdTime = null;
081    private final int materializationWindow;
082    private int lastActionNumber = 1; // over-ride by DB value
083    private CoordinatorJob.Status prevStatus = null;
084
085    static final private int lookAheadWindow = ConfigurationService.getInt(CoordMaterializeTriggerService
086            .CONF_LOOKUP_INTERVAL);
087
088    /**
089     * Default MAX timeout in minutes, after which coordinator input check will timeout
090     */
091    public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
092
093    /**
094     * The constructor for class {@link CoordMaterializeTransitionXCommand}
095     *
096     * @param jobId coordinator job id
097     * @param materializationWindow materialization window to calculate end time
098     */
099    public CoordMaterializeTransitionXCommand(String jobId, int materializationWindow) {
100        super("coord_mater", "coord_mater", 1);
101        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
102        this.materializationWindow = materializationWindow;
103    }
104
105    public CoordMaterializeTransitionXCommand(CoordinatorJobBean coordJob, int materializationWindow, Date startTime,
106                                              Date endTime) {
107        super("coord_mater", "coord_mater", 1);
108        this.jobId = ParamChecker.notEmpty(coordJob.getId(), "jobId");
109        this.materializationWindow = materializationWindow;
110        this.coordJob = coordJob;
111        this.startMatdTime = startTime;
112        this.endMatdTime = endTime;
113    }
114
115    /* (non-Javadoc)
116     * @see org.apache.oozie.command.MaterializeTransitionXCommand#transitToNext()
117     */
118    @Override
119    public void transitToNext() throws CommandException {
120    }
121
122    /* (non-Javadoc)
123     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
124     */
125    @Override
126    public void updateJob() throws CommandException {
127        updateList.add(new UpdateEntry(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE,coordJob));
128    }
129
130    /* (non-Javadoc)
131     * @see org.apache.oozie.command.MaterializeTransitionXCommand#performWrites()
132     */
133    @Override
134    public void performWrites() throws CommandException {
135        try {
136            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
137            // register the partition related dependencies of actions
138            for (JsonBean actionBean : insertList) {
139                if (actionBean instanceof CoordinatorActionBean) {
140                    CoordinatorActionBean coordAction = (CoordinatorActionBean) actionBean;
141                    if (EventHandlerService.isEnabled()) {
142                        CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
143                    }
144
145                    // TODO: time 100s should be configurable
146                    queue(new CoordActionNotificationXCommand(coordAction), 100);
147
148                    //Delay for input check = (nominal time - now)
149                    long checkDelay = coordAction.getNominalTime().getTime() - new Date().getTime();
150                    queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
151                        Math.max(checkDelay, 0));
152
153                    if (!StringUtils.isEmpty(coordAction.getPushMissingDependencies())) {
154                        // TODO: Delay in catchup mode?
155                        queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), 100);
156                    }
157                }
158            }
159        }
160        catch (JPAExecutorException jex) {
161            throw new CommandException(jex);
162        }
163    }
164
165    /* (non-Javadoc)
166     * @see org.apache.oozie.command.XCommand#getEntityKey()
167     */
168    @Override
169    public String getEntityKey() {
170        return this.jobId;
171    }
172
173    @Override
174    protected boolean isLockRequired() {
175        return true;
176    }
177
178    /* (non-Javadoc)
179     * @see org.apache.oozie.command.XCommand#loadState()
180     */
181    @Override
182    protected void loadState() throws CommandException {
183        jpaService = Services.get().get(JPAService.class);
184        if (jpaService == null) {
185            LOG.error(ErrorCode.E0610);
186        }
187
188        try {
189            coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_MATERIALIZE, jobId);
190            prevStatus = coordJob.getStatus();
191        }
192        catch (JPAExecutorException jex) {
193            throw new CommandException(jex);
194        }
195
196        // calculate start materialize and end materialize time
197        calcMatdTime();
198
199        LogUtils.setLogInfo(coordJob);
200    }
201
202    /**
203     * Calculate startMatdTime and endMatdTime from job's start time if next materialized time is null
204     *
205     * @throws CommandException thrown if failed to calculate startMatdTime and endMatdTime
206     */
207    protected void calcMatdTime() throws CommandException {
208        Timestamp startTime = coordJob.getNextMaterializedTimestamp();
209        if (startTime == null) {
210            startTime = coordJob.getStartTimestamp();
211        }
212        // calculate end time by adding materializationWindow to start time.
213        // need to convert materializationWindow from secs to milliseconds
214        long startTimeMilli = startTime.getTime();
215        long endTimeMilli = startTimeMilli + (materializationWindow * 1000);
216
217        startMatdTime = DateUtils.toDate(new Timestamp(startTimeMilli));
218        endMatdTime = DateUtils.toDate(new Timestamp(endTimeMilli));
219        endMatdTime = getMaterializationTimeForCatchUp(endMatdTime);
220        // if MaterializationWindow end time is greater than endTime
221        // for job, then set it to endTime of job
222        Date jobEndTime = coordJob.getEndTime();
223        if (endMatdTime.compareTo(jobEndTime) > 0) {
224            endMatdTime = jobEndTime;
225        }
226
227        LOG.debug("Materializing coord job id=" + jobId + ", start=" + DateUtils.formatDateOozieTZ(startMatdTime) + ", end=" + DateUtils.formatDateOozieTZ(endMatdTime)
228                + ", window=" + materializationWindow);
229    }
230
231    /**
232     * Get materialization for window for catch-up jobs. for current jobs,it reruns currentMatdate, For catch-up, end
233     * Mataterilized Time = startMatdTime + MatThrottling * frequency; unless LAST_ONLY execution order is set, in which
234     * case it returns now (to materialize all actions in the past)
235     *
236     * @param currentMatTime
237     * @return
238     * @throws CommandException
239     * @throws JDOMException
240     */
241    private Date getMaterializationTimeForCatchUp(Date currentMatTime) throws CommandException {
242        if (currentMatTime.after(new Date())) {
243            return currentMatTime;
244        }
245        if (coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.LAST_ONLY) ||
246                coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.NONE)) {
247            return new Date();
248        }
249        int frequency = 0;
250        try {
251            frequency = Integer.parseInt(coordJob.getFrequency());
252        }
253        catch (NumberFormatException e) {
254            return currentMatTime;
255        }
256
257        TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone());
258        TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr());
259        Calendar startInstance = Calendar.getInstance(appTz);
260        startInstance.setTime(startMatdTime);
261        Calendar endMatInstance = null;
262        Calendar previousInstance = startInstance;
263        for (int i = 1; i <= coordJob.getMatThrottling(); i++) {
264            endMatInstance = (Calendar) startInstance.clone();
265            endMatInstance.add(freqTU.getCalendarUnit(), i * frequency);
266            if (endMatInstance.getTime().compareTo(new Date()) >= 0) {
267                if (previousInstance.getTime().after(currentMatTime)) {
268                    return previousInstance.getTime();
269                }
270                else {
271                    return currentMatTime;
272                }
273            }
274            previousInstance = endMatInstance;
275        }
276        if (endMatInstance == null) {
277            return currentMatTime;
278        }
279        else {
280            return endMatInstance.getTime();
281        }
282    }
283
284    /* (non-Javadoc)
285     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
286     */
287    @Override
288    protected void verifyPrecondition() throws CommandException, PreconditionException {
289        if (!(coordJob.getStatus() == CoordinatorJobBean.Status.PREP || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNING
290                || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNINGWITHERROR)) {
291            throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId
292                    + " job is not in PREP or RUNNING but in " + coordJob.getStatus());
293        }
294
295        if (coordJob.isDoneMaterialization()) {
296            throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId =" + jobId
297                    + " job is already materialized");
298        }
299
300        if (coordJob.getNextMaterializedTimestamp() != null
301                && coordJob.getNextMaterializedTimestamp().compareTo(coordJob.getEndTimestamp()) >= 0) {
302            throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId
303                    + " job is already materialized");
304        }
305
306        Timestamp startTime = coordJob.getNextMaterializedTimestamp();
307        if (startTime == null) {
308            startTime = coordJob.getStartTimestamp();
309
310            if (startTime.after(new Timestamp(System.currentTimeMillis() + lookAheadWindow * 1000))) {
311                throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId="
312                        + jobId + " job's start time is not reached yet - nothing to materialize");
313            }
314        }
315
316        if (coordJob.getNextMaterializedTimestamp() != null
317                && coordJob.getNextMaterializedTimestamp().after(
318                        new Timestamp(System.currentTimeMillis() + lookAheadWindow * 1000))) {
319            throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId
320                    + " Request is for future time. Lookup time is  "
321                    + new Timestamp(System.currentTimeMillis() + lookAheadWindow * 1000) + " mat time is "
322                    + coordJob.getNextMaterializedTimestamp());
323        }
324
325        if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(coordJob.getEndTime()) >= 0) {
326            throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
327                    + ", all actions have been materialized from start time = " + coordJob.getStartTime()
328                    + " to end time = " + coordJob.getEndTime() + ", job status = " + coordJob.getStatusStr());
329        }
330
331        if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(endMatdTime) >= 0) {
332            throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
333                    + ", action is *already* materialized for Materialization start time = " + startMatdTime
334                    + ", materialization end time = " + endMatdTime + ", job status = " + coordJob.getStatusStr());
335        }
336
337        if (endMatdTime.after(coordJob.getEndTime())) {
338            throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
339                    + " materialization end time = " + endMatdTime + " surpasses coordinator job's end time = "
340                    + coordJob.getEndTime() + " job status = " + coordJob.getStatusStr());
341        }
342
343        if (coordJob.getPauseTime() != null && !startMatdTime.before(coordJob.getPauseTime())) {
344            throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
345                    + ", materialization start time = " + startMatdTime
346                    + " is after or equal to coordinator job's pause time = " + coordJob.getPauseTime()
347                    + ", job status = " + coordJob.getStatusStr());
348        }
349
350    }
351
352    /* (non-Javadoc)
353     * @see org.apache.oozie.command.MaterializeTransitionXCommand#materialize()
354     */
355    @Override
356    protected void materialize() throws CommandException {
357        Instrumentation.Cron cron = new Instrumentation.Cron();
358        cron.start();
359        try {
360            materializeActions(false);
361            updateJobMaterializeInfo(coordJob);
362        }
363        catch (CommandException ex) {
364            LOG.warn("Exception occurred:" + ex.getMessage() + " Making the job failed ", ex);
365            coordJob.setStatus(Job.Status.FAILED);
366            coordJob.resetPending();
367            // remove any materialized actions and slaEvents
368            insertList.clear();
369        }
370        catch (Exception e) {
371            LOG.error("Exception occurred:" + e.getMessage() + " Making the job failed ", e);
372            coordJob.setStatus(Job.Status.FAILED);
373            try {
374                CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE, coordJob);
375            }
376            catch (JPAExecutorException jex) {
377                throw new CommandException(ErrorCode.E1011, jex);
378            }
379            throw new CommandException(ErrorCode.E1012, e.getMessage(), e);
380        } finally {
381            cron.stop();
382            instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".materialize", cron);
383        }
384    }
385
386    /**
387     * Create action instances starting from "startMatdTime" to "endMatdTime" and store them into coord action table.
388     *
389     * @param dryrun if this is a dry run
390     * @throws Exception thrown if failed to materialize actions
391     */
392    protected String materializeActions(boolean dryrun) throws Exception {
393
394        Configuration jobConf = null;
395        try {
396            jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
397        }
398        catch (IOException ioe) {
399            LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe);
400            throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe);
401        }
402
403        String jobXml = coordJob.getJobXml();
404        Element eJob = XmlUtils.parseXml(jobXml);
405        TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone());
406
407        String frequency = coordJob.getFrequency();
408        TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr());
409        TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
410        Calendar start = Calendar.getInstance(appTz);
411        start.setTime(startMatdTime);
412        DateUtils.moveToEnd(start, endOfFlag);
413        Calendar end = Calendar.getInstance(appTz);
414        end.setTime(endMatdTime);
415        lastActionNumber = coordJob.getLastActionNumber();
416        //Intentionally printing dates in their own timezone, not Oozie timezone
417        LOG.info("materialize actions for tz=" + appTz.getDisplayName() + ",\n start=" + start.getTime() + ", end="
418                + end.getTime() + ",\n timeUnit " + freqTU.getCalendarUnit() + ",\n frequency :" + frequency + ":"
419                + freqTU + ",\n lastActionNumber " + lastActionNumber);
420        // Keep the actual start time
421        Calendar origStart = Calendar.getInstance(appTz);
422        origStart.setTime(coordJob.getStartTimestamp());
423        // Move to the End of duration, if needed.
424        DateUtils.moveToEnd(origStart, endOfFlag);
425
426        StringBuilder actionStrings = new StringBuilder();
427        Date jobPauseTime = coordJob.getPauseTime();
428        Calendar pause = null;
429        if (jobPauseTime != null) {
430            pause = Calendar.getInstance(appTz);
431            pause.setTime(DateUtils.convertDateToTimestamp(jobPauseTime));
432        }
433
434        String action = null;
435        int numWaitingActions = dryrun ? 0 : jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
436        int maxActionToBeCreated = coordJob.getMatThrottling() - numWaitingActions;
437        // If LAST_ONLY and all materialization is in the past, ignore maxActionsToBeCreated
438        boolean ignoreMaxActions =
439                (coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.LAST_ONLY) ||
440                        coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.NONE))
441                        && endMatdTime.before(new Date());
442        LOG.debug("Coordinator job :" + coordJob.getId() + ", maxActionToBeCreated :" + maxActionToBeCreated
443                + ", Mat_Throttle :" + coordJob.getMatThrottling() + ", numWaitingActions :" + numWaitingActions);
444
445        boolean isCronFrequency = false;
446
447        Calendar effStart = (Calendar) start.clone();
448        try {
449            int intFrequency = Integer.parseInt(coordJob.getFrequency());
450            effStart = (Calendar) origStart.clone();
451            effStart.add(freqTU.getCalendarUnit(), lastActionNumber * intFrequency);
452        }
453        catch (NumberFormatException e) {
454            isCronFrequency = true;
455        }
456
457        boolean firstMater = true;
458        while (effStart.compareTo(end) < 0 && (ignoreMaxActions || maxActionToBeCreated-- > 0)) {
459            if (pause != null && effStart.compareTo(pause) >= 0) {
460                break;
461            }
462
463            Date nextTime = effStart.getTime();
464
465            if (isCronFrequency) {
466                if (effStart.getTime().compareTo(startMatdTime) == 0 && firstMater) {
467                    effStart.add(Calendar.MINUTE, -1);
468                    firstMater = false;
469                }
470
471                nextTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(effStart.getTime(), coordJob);
472                effStart.setTime(nextTime);
473            }
474
475            if (effStart.compareTo(end) < 0) {
476
477                if (pause != null && effStart.compareTo(pause) >= 0) {
478                    break;
479                }
480                CoordinatorActionBean actionBean = new CoordinatorActionBean();
481                lastActionNumber++;
482
483                int timeout = coordJob.getTimeout();
484                LOG.debug("Materializing action for time=" + DateUtils.formatDateOozieTZ(effStart.getTime())
485                        + ", lastactionnumber=" + lastActionNumber + " timeout=" + timeout + " minutes");
486                Date actualTime = new Date();
487                action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(),
488                        nextTime, actualTime, lastActionNumber, jobConf, actionBean);
489                actionBean.setTimeOut(timeout);
490                if (!dryrun) {
491                    storeToDB(actionBean, action, jobConf); // Storing to table
492
493                }
494                else {
495                    actionStrings.append("action for new instance");
496                    actionStrings.append(action);
497                }
498            }
499            else {
500                break;
501            }
502
503            if (!isCronFrequency) {
504                effStart = (Calendar) origStart.clone();
505                effStart.add(freqTU.getCalendarUnit(), lastActionNumber * Integer.parseInt(coordJob.getFrequency()));
506            }
507        }
508
509        if (isCronFrequency) {
510            if (effStart.compareTo(end) < 0 && !(ignoreMaxActions || maxActionToBeCreated-- > 0)) {
511                //Since we exceed the throttle, we need to move the nextMadtime forward
512                //to avoid creating duplicate actions
513                if (!firstMater) {
514                    effStart.setTime(CoordCommandUtils.getNextValidActionTimeForCronFrequency(effStart.getTime(), coordJob));
515                }
516            }
517        }
518
519        endMatdTime = effStart.getTime();
520
521        if (!dryrun) {
522            return action;
523        }
524        else {
525            return actionStrings.toString();
526        }
527    }
528
529    private void storeToDB(CoordinatorActionBean actionBean, String actionXml, Configuration jobConf) throws Exception {
530        LOG.debug("In storeToDB() coord action id = " + actionBean.getId() + ", size of actionXml = "
531                + actionXml.length());
532        actionBean.setActionXml(actionXml);
533        insertList.add(actionBean);
534        writeActionSlaRegistration(actionXml, actionBean, jobConf);
535    }
536
537    private void writeActionSlaRegistration(String actionXml, CoordinatorActionBean actionBean, Configuration jobConf)
538            throws Exception {
539        Element eAction = XmlUtils.parseXml(actionXml);
540        Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
541                SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(),
542                                 SlaAppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getGroup(), LOG);
543                         if (slaEvent != null) {
544            insertList.add(slaEvent);
545        }
546        // inserting into new table also
547        SLAOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), actionBean.getJobId(),
548                AppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getAppName(), LOG, false,
549                CoordUtils.isSlaAlertDisabled(actionBean, coordJob.getAppName(), jobConf));
550    }
551
552    private void updateJobMaterializeInfo(CoordinatorJobBean job) throws CommandException {
553        job.setLastActionTime(endMatdTime);
554        job.setLastActionNumber(lastActionNumber);
555        // if the job endtime == action endtime, we don't need to materialize this job anymore
556        Date jobEndTime = job.getEndTime();
557
558
559        if (job.getStatus() == CoordinatorJob.Status.PREP){
560            LOG.info("[" + job.getId() + "]: Update status from " + job.getStatus() + " to RUNNING");
561            job.setStatus(Job.Status.RUNNING);
562        }
563        job.setPending();
564
565        if (jobEndTime.compareTo(endMatdTime) <= 0) {
566            LOG.info("[" + job.getId() + "]: all actions have been materialized, set pending to true");
567            // set doneMaterialization to true when materialization is done
568            job.setDoneMaterialization();
569        }
570        job.setStatus(StatusUtils.getStatus(job));
571        LOG.info("Coord Job status updated to = " + job.getStatus());
572        job.setNextMaterializedTime(endMatdTime);
573    }
574
575    /* (non-Javadoc)
576     * @see org.apache.oozie.command.XCommand#getKey()
577     */
578    @Override
579    public String getKey() {
580        return getName() + "_" + jobId;
581    }
582
583    /* (non-Javadoc)
584     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
585     */
586    @Override
587    public void notifyParent() throws CommandException {
588        // update bundle action only when status changes in coord job
589        if (this.coordJob.getBundleId() != null) {
590            if (!prevStatus.equals(coordJob.getStatus())) {
591                BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
592                bundleStatusUpdate.call();
593            }
594        }
595    }
596}