001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.sql.Timestamp;
023    import java.util.Calendar;
024    import java.util.Date;
025    import java.util.TimeZone;
026    
027    import org.apache.hadoop.conf.Configuration;
028    import org.apache.oozie.CoordinatorActionBean;
029    import org.apache.oozie.CoordinatorJobBean;
030    import org.apache.oozie.ErrorCode;
031    import org.apache.oozie.SLAEventBean;
032    import org.apache.oozie.client.CoordinatorJob;
033    import org.apache.oozie.client.Job;
034    import org.apache.oozie.client.SLAEvent.SlaAppType;
035    import org.apache.oozie.command.CommandException;
036    import org.apache.oozie.command.MaterializeTransitionXCommand;
037    import org.apache.oozie.command.PreconditionException;
038    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
039    import org.apache.oozie.coord.TimeUnit;
040    import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
041    import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
042    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
043    import org.apache.oozie.executor.jpa.JPAExecutorException;
044    import org.apache.oozie.service.JPAService;
045    import org.apache.oozie.service.Service;
046    import org.apache.oozie.service.Services;
047    import org.apache.oozie.util.DateUtils;
048    import org.apache.oozie.util.Instrumentation;
049    import org.apache.oozie.util.LogUtils;
050    import org.apache.oozie.util.ParamChecker;
051    import org.apache.oozie.util.StatusUtils;
052    import org.apache.oozie.util.XConfiguration;
053    import org.apache.oozie.util.XmlUtils;
054    import org.apache.oozie.util.db.SLADbOperations;
055    import org.jdom.Element;
056    
057    /**
058     * Materialize actions for specified start and end time for coordinator job.
059     */
060    public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCommand {
061        private static final int LOOKAHEAD_WINDOW = 300; // We look ahead 5 minutes for materialization;
062        private JPAService jpaService = null;
063        private CoordinatorJobBean coordJob = null;
064        private String jobId = null;
065        private Date startMatdTime = null;
066        private Date endMatdTime = null;
067        private final int materializationWindow;
068        private int lastActionNumber = 1; // over-ride by DB value
069        private CoordinatorJob.Status prevStatus = null;
070        /**
071         * Default MAX timeout in minutes, after which coordinator input check will timeout
072         */
073        public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
074    
075        /**
076         * The constructor for class {@link CoordMaterializeTransitionXCommand}
077         *
078         * @param jobId coordinator job id
079         * @param materializationWindow materialization window to calculate end time
080         */
081        public CoordMaterializeTransitionXCommand(String jobId, int materializationWindow) {
082            super("coord_mater", "coord_mater", 1);
083            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
084            this.materializationWindow = materializationWindow;
085        }
086    
087        /* (non-Javadoc)
088         * @see org.apache.oozie.command.MaterializeTransitionXCommand#transitToNext()
089         */
090        @Override
091        public void transitToNext() throws CommandException {
092        }
093    
094        /* (non-Javadoc)
095         * @see org.apache.oozie.command.TransitionXCommand#updateJob()
096         */
097        @Override
098        public void updateJob() throws CommandException {
099            updateList.add(coordJob);
100        }
101    
102        /* (non-Javadoc)
103         * @see org.apache.oozie.command.MaterializeTransitionXCommand#performWrites()
104         */
105        @Override
106        public void performWrites() throws CommandException {
107            try {
108                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
109            }
110            catch (JPAExecutorException jex) {
111                throw new CommandException(jex);
112            }
113        }
114    
115        /* (non-Javadoc)
116         * @see org.apache.oozie.command.XCommand#getEntityKey()
117         */
118        @Override
119        public String getEntityKey() {
120            return this.jobId;
121        }
122    
123        @Override
124        protected boolean isLockRequired() {
125            return true;
126        }
127    
128        /* (non-Javadoc)
129         * @see org.apache.oozie.command.XCommand#loadState()
130         */
131        @Override
132        protected void loadState() throws CommandException {
133            jpaService = Services.get().get(JPAService.class);
134            if (jpaService == null) {
135                LOG.error(ErrorCode.E0610);
136            }
137    
138            try {
139                coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
140                prevStatus = coordJob.getStatus();
141            }
142            catch (JPAExecutorException jex) {
143                throw new CommandException(jex);
144            }
145    
146            // calculate start materialize and end materialize time
147            calcMatdTime();
148    
149            LogUtils.setLogInfo(coordJob, logInfo);
150        }
151    
152        /**
153         * Calculate startMatdTime and endMatdTime from job's start time if next materialized time is null
154         *
155         * @throws CommandException thrown if failed to calculate startMatdTime and endMatdTime
156         */
157        protected void calcMatdTime() throws CommandException {
158            Timestamp startTime = coordJob.getNextMaterializedTimestamp();
159            if (startTime == null) {
160                startTime = coordJob.getStartTimestamp();
161            }
162            // calculate end time by adding materializationWindow to start time.
163            // need to convert materializationWindow from secs to milliseconds
164            long startTimeMilli = startTime.getTime();
165            long endTimeMilli = startTimeMilli + (materializationWindow * 1000);
166    
167            startMatdTime = DateUtils.toDate(new Timestamp(startTimeMilli));
168            endMatdTime = DateUtils.toDate(new Timestamp(endTimeMilli));
169            // if MaterializationWindow end time is greater than endTime
170            // for job, then set it to endTime of job
171            Date jobEndTime = coordJob.getEndTime();
172            if (endMatdTime.compareTo(jobEndTime) > 0) {
173                endMatdTime = jobEndTime;
174            }
175    
176            LOG.debug("Materializing coord job id=" + jobId + ", start=" + startMatdTime + ", end=" + endMatdTime
177                    + ", window=" + materializationWindow);
178        }
179    
180        /* (non-Javadoc)
181         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
182         */
183        @Override
184        protected void verifyPrecondition() throws CommandException, PreconditionException {
185            if (!(coordJob.getStatus() == CoordinatorJobBean.Status.PREP || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNING
186                    || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNINGWITHERROR)) {
187                throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId
188                        + " job is not in PREP or RUNNING but in " + coordJob.getStatus());
189            }
190    
191            if (coordJob.getNextMaterializedTimestamp() != null
192                    && coordJob.getNextMaterializedTimestamp().compareTo(coordJob.getEndTimestamp()) >= 0) {
193                throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId
194                        + " job is already materialized");
195            }
196    
197            Timestamp startTime = coordJob.getNextMaterializedTimestamp();
198            if (startTime == null) {
199                startTime = coordJob.getStartTimestamp();
200    
201                if (startTime.after(new Timestamp(System.currentTimeMillis() + LOOKAHEAD_WINDOW * 1000))) {
202                    throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId="
203                            + jobId + " job's start time is not reached yet - nothing to materialize");
204                }
205            }
206    
207            if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(coordJob.getEndTime()) >= 0) {
208                throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
209                        + ", all actions have been materialized from start time = " + coordJob.getStartTime()
210                        + " to end time = " + coordJob.getEndTime() + ", job status = " + coordJob.getStatusStr());
211            }
212    
213            if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(endMatdTime) >= 0) {
214                throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
215                        + ", action is *already* materialized for Materialization start time = " + startMatdTime
216                        + ", materialization end time = " + endMatdTime + ", job status = " + coordJob.getStatusStr());
217            }
218    
219            if (endMatdTime.after(coordJob.getEndTime())) {
220                throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
221                        + " materialization end time = " + endMatdTime + " surpasses coordinator job's end time = "
222                        + coordJob.getEndTime() + " job status = " + coordJob.getStatusStr());
223            }
224    
225            if (coordJob.getPauseTime() != null && !startMatdTime.before(coordJob.getPauseTime())) {
226                throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
227                        + ", materialization start time = " + startMatdTime
228                        + " is after or equal to coordinator job's pause time = " + coordJob.getPauseTime()
229                        + ", job status = " + coordJob.getStatusStr());
230            }
231    
232        }
233    
234        /* (non-Javadoc)
235         * @see org.apache.oozie.command.MaterializeTransitionXCommand#materialize()
236         */
237        @Override
238        protected void materialize() throws CommandException {
239            Instrumentation.Cron cron = new Instrumentation.Cron();
240            cron.start();
241            try {
242                materializeActions(false);
243                updateJobMaterializeInfo(coordJob);
244            }
245            catch (CommandException ex) {
246                LOG.warn("Exception occurs:" + ex.getMessage() + " Making the job failed ", ex);
247                coordJob.setStatus(Job.Status.FAILED);
248                coordJob.resetPending();
249            }
250            catch (Exception e) {
251                LOG.error("Excepion thrown :", e);
252                throw new CommandException(ErrorCode.E1001, e.getMessage(), e);
253            }
254            cron.stop();
255    
256        }
257    
258        /**
259         * Create action instances starting from "startMatdTime" to "endMatdTime" and store them into coord action table.
260         *
261         * @param dryrun if this is a dry run
262         * @throws Exception thrown if failed to materialize actions
263         */
264        protected String materializeActions(boolean dryrun) throws Exception {
265    
266            Configuration jobConf = null;
267            try {
268                jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
269            }
270            catch (IOException ioe) {
271                LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe);
272                throw new CommandException(ErrorCode.E1005, ioe);
273            }
274    
275            String jobXml = coordJob.getJobXml();
276            Element eJob = XmlUtils.parseXml(jobXml);
277            TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone());
278            int frequency = coordJob.getFrequency();
279            TimeUnit freqTU = TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit"));
280            TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
281            Calendar start = Calendar.getInstance(appTz);
282            start.setTime(startMatdTime);
283            DateUtils.moveToEnd(start, endOfFlag);
284            Calendar end = Calendar.getInstance(appTz);
285            end.setTime(endMatdTime);
286            lastActionNumber = coordJob.getLastActionNumber();
287            LOG.info("materialize actions for tz=" + appTz.getDisplayName() + ",\n start=" + start.getTime() + ", end="
288                    + end.getTime() + ",\n timeUnit " + freqTU.getCalendarUnit() + ",\n frequency :" + frequency + ":"
289                    + freqTU + ",\n lastActionNumber " + lastActionNumber);
290            // Keep the actual start time
291            Calendar origStart = Calendar.getInstance(appTz);
292            origStart.setTime(coordJob.getStartTimestamp());
293            // Move to the End of duration, if needed.
294            DateUtils.moveToEnd(origStart, endOfFlag);
295            // Cloning the start time to be used in loop iteration
296            Calendar effStart = (Calendar) origStart.clone();
297            // Move the time when the previous action finished
298            effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency);
299    
300            StringBuilder actionStrings = new StringBuilder();
301            Date jobPauseTime = coordJob.getPauseTime();
302            Calendar pause = null;
303            if (jobPauseTime != null) {
304                pause = Calendar.getInstance(appTz);
305                pause.setTime(DateUtils.convertDateToTimestamp(jobPauseTime));
306            }
307    
308            String action = null;
309            JPAService jpaService = Services.get().get(JPAService.class);
310            int numWaitingActions = jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
311            int maxActionToBeCreated = coordJob.getMatThrottling() - numWaitingActions;
312            LOG.debug("Coordinator job :" + coordJob.getId() + ", maxActionToBeCreated :" + maxActionToBeCreated
313                    + ", Mat_Throttle :" + coordJob.getMatThrottling() + ", numWaitingActions :" + numWaitingActions);
314    
315            while (effStart.compareTo(end) < 0 && maxActionToBeCreated-- > 0) {
316                if (pause != null && effStart.compareTo(pause) >= 0) {
317                    break;
318                }
319                CoordinatorActionBean actionBean = new CoordinatorActionBean();
320                lastActionNumber++;
321    
322                int timeout = coordJob.getTimeout();
323                LOG.debug("Materializing action for time=" + effStart.getTime() + ", lastactionnumber=" + lastActionNumber
324                        + " timeout=" + timeout + " minutes");
325                Date actualTime = new Date();
326                action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(),
327                        effStart.getTime(), actualTime, lastActionNumber, jobConf, actionBean);
328                actionBean.setTimeOut(timeout);
329    
330                if (!dryrun) {
331                    storeToDB(actionBean, action); // Storing to table
332                }
333                else {
334                    actionStrings.append("action for new instance");
335                    actionStrings.append(action);
336                }
337                // Restore the original start time
338                effStart = (Calendar) origStart.clone();
339                effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency);
340            }
341    
342            endMatdTime = new Date(effStart.getTimeInMillis());
343            if (!dryrun) {
344                return action;
345            }
346            else {
347                return actionStrings.toString();
348            }
349        }
350    
351        private void storeToDB(CoordinatorActionBean actionBean, String actionXml) throws Exception {
352            LOG.debug("In storeToDB() coord action id = " + actionBean.getId() + ", size of actionXml = "
353                    + actionXml.length());
354            actionBean.setActionXml(actionXml);
355    
356            insertList.add(actionBean);
357            writeActionRegistration(actionXml, actionBean);
358    
359            // TODO: time 100s should be configurable
360            queue(new CoordActionNotificationXCommand(actionBean), 100);
361            queue(new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId()), 100);
362        }
363    
364        private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean) throws Exception {
365            Element eAction = XmlUtils.parseXml(actionXml);
366            Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
367            SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, coordJob
368                    .getUser(), coordJob.getGroup(), LOG);
369            if(slaEvent != null) {
370                insertList.add(slaEvent);
371            }
372        }
373    
374        private void updateJobMaterializeInfo(CoordinatorJobBean job) throws CommandException {
375            job.setLastActionTime(endMatdTime);
376            job.setLastActionNumber(lastActionNumber);
377            // if the job endtime == action endtime, we don't need to materialize this job anymore
378            Date jobEndTime = job.getEndTime();
379    
380    
381            if (job.getStatus() == CoordinatorJob.Status.PREP){
382                LOG.info("[" + job.getId() + "]: Update status from " + job.getStatus() + " to RUNNING");
383                job.setStatus(Job.Status.RUNNING);
384            }
385            job.setPending();
386    
387            if (jobEndTime.compareTo(endMatdTime) <= 0) {
388                LOG.info("[" + job.getId() + "]: all actions have been materialized, job status = " + job.getStatus()
389                        + ", set pending to true");
390                // set doneMaterialization to true when materialization is done
391                job.setDoneMaterialization();
392            }
393            job.setStatus(StatusUtils.getStatus(job));
394            job.setNextMaterializedTime(endMatdTime);
395        }
396    
397        /* (non-Javadoc)
398         * @see org.apache.oozie.command.XCommand#getKey()
399         */
400        @Override
401        public String getKey() {
402            return getName() + "_" + jobId;
403        }
404    
405        /* (non-Javadoc)
406         * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
407         */
408        @Override
409        public void notifyParent() throws CommandException {
410            // update bundle action only when status changes in coord job
411            if (this.coordJob.getBundleId() != null) {
412                if (!prevStatus.equals(coordJob.getStatus())) {
413                    BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
414                    bundleStatusUpdate.call();
415                }
416            }
417        }
418    }