001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.sql.Timestamp;
023    import java.util.Calendar;
024    import java.util.Date;
025    import java.util.TimeZone;
026    
027    import org.apache.hadoop.conf.Configuration;
028    import org.apache.oozie.AppType;
029    import org.apache.oozie.CoordinatorActionBean;
030    import org.apache.oozie.CoordinatorJobBean;
031    import org.apache.oozie.ErrorCode;
032    import org.apache.oozie.SLAEventBean;
033    import org.apache.oozie.client.CoordinatorJob;
034    import org.apache.oozie.client.Job;
035    import org.apache.oozie.client.SLAEvent.SlaAppType;
036    import org.apache.oozie.client.rest.JsonBean;
037    import org.apache.oozie.command.CommandException;
038    import org.apache.oozie.command.MaterializeTransitionXCommand;
039    import org.apache.oozie.command.PreconditionException;
040    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
041    import org.apache.oozie.coord.TimeUnit;
042    import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
043    import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
044    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
045    import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
046    import org.apache.oozie.executor.jpa.JPAExecutorException;
047    import org.apache.oozie.service.EventHandlerService;
048    import org.apache.oozie.service.JPAService;
049    import org.apache.oozie.service.Service;
050    import org.apache.oozie.service.Services;
051    import org.apache.oozie.util.DateUtils;
052    import org.apache.oozie.util.Instrumentation;
053    import org.apache.oozie.util.LogUtils;
054    import org.apache.oozie.util.ParamChecker;
055    import org.apache.oozie.sla.SLAOperations;
056    import org.apache.oozie.util.StatusUtils;
057    import org.apache.oozie.util.XConfiguration;
058    import org.apache.oozie.util.XmlUtils;
059    import org.apache.oozie.util.db.SLADbOperations;
060    import org.jdom.Element;
061    
062    /**
063     * Materialize actions for specified start and end time for coordinator job.
064     */
065    @SuppressWarnings("deprecation")
066    public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCommand {
067        private static final int LOOKAHEAD_WINDOW = 300; // We look ahead 5 minutes for materialization;
068        private JPAService jpaService = null;
069        private CoordinatorJobBean coordJob = null;
070        private String jobId = null;
071        private Date startMatdTime = null;
072        private Date endMatdTime = null;
073        private final int materializationWindow;
074        private int lastActionNumber = 1; // over-ride by DB value
075        private CoordinatorJob.Status prevStatus = null;
076        /**
077         * Default MAX timeout in minutes, after which coordinator input check will timeout
078         */
079        public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
080    
081        /**
082         * The constructor for class {@link CoordMaterializeTransitionXCommand}
083         *
084         * @param jobId coordinator job id
085         * @param materializationWindow materialization window to calculate end time
086         */
087        public CoordMaterializeTransitionXCommand(String jobId, int materializationWindow) {
088            super("coord_mater", "coord_mater", 1);
089            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
090            this.materializationWindow = materializationWindow;
091        }
092    
093        /* (non-Javadoc)
094         * @see org.apache.oozie.command.MaterializeTransitionXCommand#transitToNext()
095         */
096        @Override
097        public void transitToNext() throws CommandException {
098        }
099    
100        /* (non-Javadoc)
101         * @see org.apache.oozie.command.TransitionXCommand#updateJob()
102         */
103        @Override
104        public void updateJob() throws CommandException {
105            updateList.add(coordJob);
106        }
107    
108        /* (non-Javadoc)
109         * @see org.apache.oozie.command.MaterializeTransitionXCommand#performWrites()
110         */
111        @Override
112        public void performWrites() throws CommandException {
113            try {
114                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
115                // register the partition related dependencies of actions
116                for (JsonBean actionBean : insertList) {
117                    if (actionBean instanceof CoordinatorActionBean) {
118                        CoordinatorActionBean coordAction = (CoordinatorActionBean) actionBean;
119                        if (EventHandlerService.isEnabled()) {
120                            CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
121                        }
122                        if (coordAction.getPushMissingDependencies() != null) {
123                            // TODO: Delay in catchup mode?
124                            queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), 100);
125                        }
126                    }
127                }
128            }
129            catch (JPAExecutorException jex) {
130                throw new CommandException(jex);
131            }
132        }
133    
134        /* (non-Javadoc)
135         * @see org.apache.oozie.command.XCommand#getEntityKey()
136         */
137        @Override
138        public String getEntityKey() {
139            return this.jobId;
140        }
141    
142        @Override
143        protected boolean isLockRequired() {
144            return true;
145        }
146    
147        /* (non-Javadoc)
148         * @see org.apache.oozie.command.XCommand#loadState()
149         */
150        @Override
151        protected void loadState() throws CommandException {
152            jpaService = Services.get().get(JPAService.class);
153            if (jpaService == null) {
154                LOG.error(ErrorCode.E0610);
155            }
156    
157            try {
158                coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
159                prevStatus = coordJob.getStatus();
160            }
161            catch (JPAExecutorException jex) {
162                throw new CommandException(jex);
163            }
164    
165            // calculate start materialize and end materialize time
166            calcMatdTime();
167    
168            LogUtils.setLogInfo(coordJob, logInfo);
169        }
170    
171        /**
172         * Calculate startMatdTime and endMatdTime from job's start time if next materialized time is null
173         *
174         * @throws CommandException thrown if failed to calculate startMatdTime and endMatdTime
175         */
176        protected void calcMatdTime() throws CommandException {
177            Timestamp startTime = coordJob.getNextMaterializedTimestamp();
178            if (startTime == null) {
179                startTime = coordJob.getStartTimestamp();
180            }
181            // calculate end time by adding materializationWindow to start time.
182            // need to convert materializationWindow from secs to milliseconds
183            long startTimeMilli = startTime.getTime();
184            long endTimeMilli = startTimeMilli + (materializationWindow * 1000);
185    
186            startMatdTime = DateUtils.toDate(new Timestamp(startTimeMilli));
187            endMatdTime = DateUtils.toDate(new Timestamp(endTimeMilli));
188            // if MaterializationWindow end time is greater than endTime
189            // for job, then set it to endTime of job
190            Date jobEndTime = coordJob.getEndTime();
191            if (endMatdTime.compareTo(jobEndTime) > 0) {
192                endMatdTime = jobEndTime;
193            }
194    
195            LOG.debug("Materializing coord job id=" + jobId + ", start=" + startMatdTime + ", end=" + endMatdTime
196                    + ", window=" + materializationWindow);
197        }
198    
199        /* (non-Javadoc)
200         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
201         */
202        @Override
203        protected void verifyPrecondition() throws CommandException, PreconditionException {
204            if (!(coordJob.getStatus() == CoordinatorJobBean.Status.PREP || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNING
205                    || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNINGWITHERROR)) {
206                throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId
207                        + " job is not in PREP or RUNNING but in " + coordJob.getStatus());
208            }
209    
210            if (coordJob.isDoneMaterialization()) {
211                throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId =" + jobId
212                        + " job is already materialized");
213            }
214    
215            if (coordJob.getNextMaterializedTimestamp() != null
216                    && coordJob.getNextMaterializedTimestamp().compareTo(coordJob.getEndTimestamp()) >= 0) {
217                throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId
218                        + " job is already materialized");
219            }
220    
221            Timestamp startTime = coordJob.getNextMaterializedTimestamp();
222            if (startTime == null) {
223                startTime = coordJob.getStartTimestamp();
224    
225                if (startTime.after(new Timestamp(System.currentTimeMillis() + LOOKAHEAD_WINDOW * 1000))) {
226                    throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId="
227                            + jobId + " job's start time is not reached yet - nothing to materialize");
228                }
229            }
230    
231            if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(coordJob.getEndTime()) >= 0) {
232                throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
233                        + ", all actions have been materialized from start time = " + coordJob.getStartTime()
234                        + " to end time = " + coordJob.getEndTime() + ", job status = " + coordJob.getStatusStr());
235            }
236    
237            if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(endMatdTime) >= 0) {
238                throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
239                        + ", action is *already* materialized for Materialization start time = " + startMatdTime
240                        + ", materialization end time = " + endMatdTime + ", job status = " + coordJob.getStatusStr());
241            }
242    
243            if (endMatdTime.after(coordJob.getEndTime())) {
244                throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
245                        + " materialization end time = " + endMatdTime + " surpasses coordinator job's end time = "
246                        + coordJob.getEndTime() + " job status = " + coordJob.getStatusStr());
247            }
248    
249            if (coordJob.getPauseTime() != null && !startMatdTime.before(coordJob.getPauseTime())) {
250                throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
251                        + ", materialization start time = " + startMatdTime
252                        + " is after or equal to coordinator job's pause time = " + coordJob.getPauseTime()
253                        + ", job status = " + coordJob.getStatusStr());
254            }
255    
256        }
257    
258        /* (non-Javadoc)
259         * @see org.apache.oozie.command.MaterializeTransitionXCommand#materialize()
260         */
261        @Override
262        protected void materialize() throws CommandException {
263            Instrumentation.Cron cron = new Instrumentation.Cron();
264            cron.start();
265            try {
266                materializeActions(false);
267                updateJobMaterializeInfo(coordJob);
268            }
269            catch (CommandException ex) {
270                LOG.warn("Exception occurred:" + ex.getMessage() + " Making the job failed ", ex);
271                coordJob.setStatus(Job.Status.FAILED);
272                coordJob.resetPending();
273                // remove any materialized actions and slaEvents
274                insertList.clear();
275            }
276            catch (Exception e) {
277                LOG.error("Exception occurred:" + e.getMessage() + " Making the job failed ", e);
278                coordJob.setStatus(Job.Status.FAILED);
279                try {
280                    jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
281                }
282                catch (JPAExecutorException jex) {
283                    throw new CommandException(ErrorCode.E1011, jex);
284                }
285                throw new CommandException(ErrorCode.E1012, e.getMessage(), e);
286            }
287            cron.stop();
288    
289        }
290    
291        /**
292         * Create action instances starting from "startMatdTime" to "endMatdTime" and store them into coord action table.
293         *
294         * @param dryrun if this is a dry run
295         * @throws Exception thrown if failed to materialize actions
296         */
297        protected String materializeActions(boolean dryrun) throws Exception {
298    
299            Configuration jobConf = null;
300            try {
301                jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
302            }
303            catch (IOException ioe) {
304                LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe);
305                throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe);
306            }
307    
308            String jobXml = coordJob.getJobXml();
309            Element eJob = XmlUtils.parseXml(jobXml);
310            TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone());
311            int frequency = Integer.valueOf(coordJob.getFrequency());
312            TimeUnit freqTU = TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit"));
313            TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
314            Calendar start = Calendar.getInstance(appTz);
315            start.setTime(startMatdTime);
316            DateUtils.moveToEnd(start, endOfFlag);
317            Calendar end = Calendar.getInstance(appTz);
318            end.setTime(endMatdTime);
319            lastActionNumber = coordJob.getLastActionNumber();
320            LOG.info("materialize actions for tz=" + appTz.getDisplayName() + ",\n start=" + start.getTime() + ", end="
321                    + end.getTime() + ",\n timeUnit " + freqTU.getCalendarUnit() + ",\n frequency :" + frequency + ":"
322                    + freqTU + ",\n lastActionNumber " + lastActionNumber);
323            // Keep the actual start time
324            Calendar origStart = Calendar.getInstance(appTz);
325            origStart.setTime(coordJob.getStartTimestamp());
326            // Move to the End of duration, if needed.
327            DateUtils.moveToEnd(origStart, endOfFlag);
328            // Cloning the start time to be used in loop iteration
329            Calendar effStart = (Calendar) origStart.clone();
330            // Move the time when the previous action finished
331            effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency);
332    
333            StringBuilder actionStrings = new StringBuilder();
334            Date jobPauseTime = coordJob.getPauseTime();
335            Calendar pause = null;
336            if (jobPauseTime != null) {
337                pause = Calendar.getInstance(appTz);
338                pause.setTime(DateUtils.convertDateToTimestamp(jobPauseTime));
339            }
340    
341            String action = null;
342            JPAService jpaService = Services.get().get(JPAService.class);
343            int numWaitingActions = jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
344            int maxActionToBeCreated = coordJob.getMatThrottling() - numWaitingActions;
345            LOG.debug("Coordinator job :" + coordJob.getId() + ", maxActionToBeCreated :" + maxActionToBeCreated
346                    + ", Mat_Throttle :" + coordJob.getMatThrottling() + ", numWaitingActions :" + numWaitingActions);
347    
348            while (effStart.compareTo(end) < 0 && maxActionToBeCreated-- > 0) {
349                if (pause != null && effStart.compareTo(pause) >= 0) {
350                    break;
351                }
352                CoordinatorActionBean actionBean = new CoordinatorActionBean();
353                lastActionNumber++;
354    
355                int timeout = coordJob.getTimeout();
356                LOG.debug("Materializing action for time=" + effStart.getTime() + ", lastactionnumber=" + lastActionNumber
357                        + " timeout=" + timeout + " minutes");
358                Date actualTime = new Date();
359                action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(),
360                        effStart.getTime(), actualTime, lastActionNumber, jobConf, actionBean);
361                actionBean.setTimeOut(timeout);
362    
363                if (!dryrun) {
364                    storeToDB(actionBean, action); // Storing to table
365    
366                }
367                else {
368                    actionStrings.append("action for new instance");
369                    actionStrings.append(action);
370                }
371                // Restore the original start time
372                effStart = (Calendar) origStart.clone();
373                effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency);
374            }
375    
376            endMatdTime = new Date(effStart.getTimeInMillis());
377            if (!dryrun) {
378                return action;
379            }
380            else {
381                return actionStrings.toString();
382            }
383        }
384    
385        private void storeToDB(CoordinatorActionBean actionBean, String actionXml) throws Exception {
386            LOG.debug("In storeToDB() coord action id = " + actionBean.getId() + ", size of actionXml = "
387                    + actionXml.length());
388            actionBean.setActionXml(actionXml);
389    
390            insertList.add(actionBean);
391            writeActionSlaRegistration(actionXml, actionBean);
392    
393            // TODO: time 100s should be configurable
394            queue(new CoordActionNotificationXCommand(actionBean), 100);
395            queue(new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId()), 100);
396        }
397    
398        private void writeActionSlaRegistration(String actionXml, CoordinatorActionBean actionBean) throws Exception {
399            Element eAction = XmlUtils.parseXml(actionXml);
400            Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
401            SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, coordJob
402                    .getUser(), coordJob.getGroup(), LOG);
403            if(slaEvent != null) {
404                insertList.add(slaEvent);
405            }
406            // inserting into new table also
407            SLAOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), actionBean.getJobId(),
408                    AppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getAppName(), LOG, false);
409        }
410    
411        private void updateJobMaterializeInfo(CoordinatorJobBean job) throws CommandException {
412            job.setLastActionTime(endMatdTime);
413            job.setLastActionNumber(lastActionNumber);
414            // if the job endtime == action endtime, we don't need to materialize this job anymore
415            Date jobEndTime = job.getEndTime();
416    
417    
418            if (job.getStatus() == CoordinatorJob.Status.PREP){
419                LOG.info("[" + job.getId() + "]: Update status from " + job.getStatus() + " to RUNNING");
420                job.setStatus(Job.Status.RUNNING);
421            }
422            job.setPending();
423    
424            if (jobEndTime.compareTo(endMatdTime) <= 0) {
425                LOG.info("[" + job.getId() + "]: all actions have been materialized, job status = " + job.getStatus()
426                        + ", set pending to true");
427                // set doneMaterialization to true when materialization is done
428                job.setDoneMaterialization();
429            }
430            job.setStatus(StatusUtils.getStatus(job));
431            job.setNextMaterializedTime(endMatdTime);
432        }
433    
434        /* (non-Javadoc)
435         * @see org.apache.oozie.command.XCommand#getKey()
436         */
437        @Override
438        public String getKey() {
439            return getName() + "_" + jobId;
440        }
441    
442        /* (non-Javadoc)
443         * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
444         */
445        @Override
446        public void notifyParent() throws CommandException {
447            // update bundle action only when status changes in coord job
448            if (this.coordJob.getBundleId() != null) {
449                if (!prevStatus.equals(coordJob.getStatus())) {
450                    BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
451                    bundleStatusUpdate.call();
452                }
453            }
454        }
455    }