This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.sql.Timestamp;
023 import java.util.Calendar;
024 import java.util.Date;
025 import java.util.TimeZone;
026
027 import org.apache.hadoop.conf.Configuration;
028 import org.apache.oozie.AppType;
029 import org.apache.oozie.CoordinatorActionBean;
030 import org.apache.oozie.CoordinatorJobBean;
031 import org.apache.oozie.ErrorCode;
032 import org.apache.oozie.SLAEventBean;
033 import org.apache.oozie.client.CoordinatorJob;
034 import org.apache.oozie.client.Job;
035 import org.apache.oozie.client.SLAEvent.SlaAppType;
036 import org.apache.oozie.client.rest.JsonBean;
037 import org.apache.oozie.command.CommandException;
038 import org.apache.oozie.command.MaterializeTransitionXCommand;
039 import org.apache.oozie.command.PreconditionException;
040 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
041 import org.apache.oozie.coord.TimeUnit;
042 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
043 import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
044 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
045 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
046 import org.apache.oozie.executor.jpa.JPAExecutorException;
047 import org.apache.oozie.service.EventHandlerService;
048 import org.apache.oozie.service.JPAService;
049 import org.apache.oozie.service.Service;
050 import org.apache.oozie.service.Services;
051 import org.apache.oozie.util.DateUtils;
052 import org.apache.oozie.util.Instrumentation;
053 import org.apache.oozie.util.LogUtils;
054 import org.apache.oozie.util.ParamChecker;
055 import org.apache.oozie.sla.SLAOperations;
056 import org.apache.oozie.util.StatusUtils;
057 import org.apache.oozie.util.XConfiguration;
058 import org.apache.oozie.util.XmlUtils;
059 import org.apache.oozie.util.db.SLADbOperations;
060 import org.jdom.Element;
061
062 /**
063 * Materialize actions for specified start and end time for coordinator job.
064 */
065 @SuppressWarnings("deprecation")
066 public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCommand {
067 private static final int LOOKAHEAD_WINDOW = 300; // We look ahead 5 minutes for materialization;
068 private JPAService jpaService = null;
069 private CoordinatorJobBean coordJob = null;
070 private String jobId = null;
071 private Date startMatdTime = null;
072 private Date endMatdTime = null;
073 private final int materializationWindow;
074 private int lastActionNumber = 1; // over-ride by DB value
075 private CoordinatorJob.Status prevStatus = null;
076 /**
077 * Default MAX timeout in minutes, after which coordinator input check will timeout
078 */
079 public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
080
081 /**
082 * The constructor for class {@link CoordMaterializeTransitionXCommand}
083 *
084 * @param jobId coordinator job id
085 * @param materializationWindow materialization window to calculate end time
086 */
087 public CoordMaterializeTransitionXCommand(String jobId, int materializationWindow) {
088 super("coord_mater", "coord_mater", 1);
089 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
090 this.materializationWindow = materializationWindow;
091 }
092
093 /* (non-Javadoc)
094 * @see org.apache.oozie.command.MaterializeTransitionXCommand#transitToNext()
095 */
096 @Override
097 public void transitToNext() throws CommandException {
098 }
099
100 /* (non-Javadoc)
101 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
102 */
103 @Override
104 public void updateJob() throws CommandException {
105 updateList.add(coordJob);
106 }
107
108 /* (non-Javadoc)
109 * @see org.apache.oozie.command.MaterializeTransitionXCommand#performWrites()
110 */
111 @Override
112 public void performWrites() throws CommandException {
113 try {
114 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
115 // register the partition related dependencies of actions
116 for (JsonBean actionBean : insertList) {
117 if (actionBean instanceof CoordinatorActionBean) {
118 CoordinatorActionBean coordAction = (CoordinatorActionBean) actionBean;
119 if (EventHandlerService.isEnabled()) {
120 CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
121 }
122 if (coordAction.getPushMissingDependencies() != null) {
123 // TODO: Delay in catchup mode?
124 queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), 100);
125 }
126 }
127 }
128 }
129 catch (JPAExecutorException jex) {
130 throw new CommandException(jex);
131 }
132 }
133
134 /* (non-Javadoc)
135 * @see org.apache.oozie.command.XCommand#getEntityKey()
136 */
137 @Override
138 public String getEntityKey() {
139 return this.jobId;
140 }
141
142 @Override
143 protected boolean isLockRequired() {
144 return true;
145 }
146
147 /* (non-Javadoc)
148 * @see org.apache.oozie.command.XCommand#loadState()
149 */
150 @Override
151 protected void loadState() throws CommandException {
152 jpaService = Services.get().get(JPAService.class);
153 if (jpaService == null) {
154 LOG.error(ErrorCode.E0610);
155 }
156
157 try {
158 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
159 prevStatus = coordJob.getStatus();
160 }
161 catch (JPAExecutorException jex) {
162 throw new CommandException(jex);
163 }
164
165 // calculate start materialize and end materialize time
166 calcMatdTime();
167
168 LogUtils.setLogInfo(coordJob, logInfo);
169 }
170
171 /**
172 * Calculate startMatdTime and endMatdTime from job's start time if next materialized time is null
173 *
174 * @throws CommandException thrown if failed to calculate startMatdTime and endMatdTime
175 */
176 protected void calcMatdTime() throws CommandException {
177 Timestamp startTime = coordJob.getNextMaterializedTimestamp();
178 if (startTime == null) {
179 startTime = coordJob.getStartTimestamp();
180 }
181 // calculate end time by adding materializationWindow to start time.
182 // need to convert materializationWindow from secs to milliseconds
183 long startTimeMilli = startTime.getTime();
184 long endTimeMilli = startTimeMilli + (materializationWindow * 1000);
185
186 startMatdTime = DateUtils.toDate(new Timestamp(startTimeMilli));
187 endMatdTime = DateUtils.toDate(new Timestamp(endTimeMilli));
188 // if MaterializationWindow end time is greater than endTime
189 // for job, then set it to endTime of job
190 Date jobEndTime = coordJob.getEndTime();
191 if (endMatdTime.compareTo(jobEndTime) > 0) {
192 endMatdTime = jobEndTime;
193 }
194
195 LOG.debug("Materializing coord job id=" + jobId + ", start=" + startMatdTime + ", end=" + endMatdTime
196 + ", window=" + materializationWindow);
197 }
198
199 /* (non-Javadoc)
200 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
201 */
202 @Override
203 protected void verifyPrecondition() throws CommandException, PreconditionException {
204 if (!(coordJob.getStatus() == CoordinatorJobBean.Status.PREP || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNING
205 || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNINGWITHERROR)) {
206 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId
207 + " job is not in PREP or RUNNING but in " + coordJob.getStatus());
208 }
209
210 if (coordJob.isDoneMaterialization()) {
211 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId =" + jobId
212 + " job is already materialized");
213 }
214
215 if (coordJob.getNextMaterializedTimestamp() != null
216 && coordJob.getNextMaterializedTimestamp().compareTo(coordJob.getEndTimestamp()) >= 0) {
217 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId
218 + " job is already materialized");
219 }
220
221 Timestamp startTime = coordJob.getNextMaterializedTimestamp();
222 if (startTime == null) {
223 startTime = coordJob.getStartTimestamp();
224
225 if (startTime.after(new Timestamp(System.currentTimeMillis() + LOOKAHEAD_WINDOW * 1000))) {
226 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId="
227 + jobId + " job's start time is not reached yet - nothing to materialize");
228 }
229 }
230
231 if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(coordJob.getEndTime()) >= 0) {
232 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
233 + ", all actions have been materialized from start time = " + coordJob.getStartTime()
234 + " to end time = " + coordJob.getEndTime() + ", job status = " + coordJob.getStatusStr());
235 }
236
237 if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(endMatdTime) >= 0) {
238 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
239 + ", action is *already* materialized for Materialization start time = " + startMatdTime
240 + ", materialization end time = " + endMatdTime + ", job status = " + coordJob.getStatusStr());
241 }
242
243 if (endMatdTime.after(coordJob.getEndTime())) {
244 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
245 + " materialization end time = " + endMatdTime + " surpasses coordinator job's end time = "
246 + coordJob.getEndTime() + " job status = " + coordJob.getStatusStr());
247 }
248
249 if (coordJob.getPauseTime() != null && !startMatdTime.before(coordJob.getPauseTime())) {
250 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
251 + ", materialization start time = " + startMatdTime
252 + " is after or equal to coordinator job's pause time = " + coordJob.getPauseTime()
253 + ", job status = " + coordJob.getStatusStr());
254 }
255
256 }
257
258 /* (non-Javadoc)
259 * @see org.apache.oozie.command.MaterializeTransitionXCommand#materialize()
260 */
261 @Override
262 protected void materialize() throws CommandException {
263 Instrumentation.Cron cron = new Instrumentation.Cron();
264 cron.start();
265 try {
266 materializeActions(false);
267 updateJobMaterializeInfo(coordJob);
268 }
269 catch (CommandException ex) {
270 LOG.warn("Exception occurred:" + ex.getMessage() + " Making the job failed ", ex);
271 coordJob.setStatus(Job.Status.FAILED);
272 coordJob.resetPending();
273 // remove any materialized actions and slaEvents
274 insertList.clear();
275 }
276 catch (Exception e) {
277 LOG.error("Exception occurred:" + e.getMessage() + " Making the job failed ", e);
278 coordJob.setStatus(Job.Status.FAILED);
279 try {
280 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
281 }
282 catch (JPAExecutorException jex) {
283 throw new CommandException(ErrorCode.E1011, jex);
284 }
285 throw new CommandException(ErrorCode.E1012, e.getMessage(), e);
286 }
287 cron.stop();
288
289 }
290
291 /**
292 * Create action instances starting from "startMatdTime" to "endMatdTime" and store them into coord action table.
293 *
294 * @param dryrun if this is a dry run
295 * @throws Exception thrown if failed to materialize actions
296 */
297 protected String materializeActions(boolean dryrun) throws Exception {
298
299 Configuration jobConf = null;
300 try {
301 jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
302 }
303 catch (IOException ioe) {
304 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe);
305 throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe);
306 }
307
308 String jobXml = coordJob.getJobXml();
309 Element eJob = XmlUtils.parseXml(jobXml);
310 TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone());
311 int frequency = Integer.valueOf(coordJob.getFrequency());
312 TimeUnit freqTU = TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit"));
313 TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
314 Calendar start = Calendar.getInstance(appTz);
315 start.setTime(startMatdTime);
316 DateUtils.moveToEnd(start, endOfFlag);
317 Calendar end = Calendar.getInstance(appTz);
318 end.setTime(endMatdTime);
319 lastActionNumber = coordJob.getLastActionNumber();
320 LOG.info("materialize actions for tz=" + appTz.getDisplayName() + ",\n start=" + start.getTime() + ", end="
321 + end.getTime() + ",\n timeUnit " + freqTU.getCalendarUnit() + ",\n frequency :" + frequency + ":"
322 + freqTU + ",\n lastActionNumber " + lastActionNumber);
323 // Keep the actual start time
324 Calendar origStart = Calendar.getInstance(appTz);
325 origStart.setTime(coordJob.getStartTimestamp());
326 // Move to the End of duration, if needed.
327 DateUtils.moveToEnd(origStart, endOfFlag);
328 // Cloning the start time to be used in loop iteration
329 Calendar effStart = (Calendar) origStart.clone();
330 // Move the time when the previous action finished
331 effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency);
332
333 StringBuilder actionStrings = new StringBuilder();
334 Date jobPauseTime = coordJob.getPauseTime();
335 Calendar pause = null;
336 if (jobPauseTime != null) {
337 pause = Calendar.getInstance(appTz);
338 pause.setTime(DateUtils.convertDateToTimestamp(jobPauseTime));
339 }
340
341 String action = null;
342 JPAService jpaService = Services.get().get(JPAService.class);
343 int numWaitingActions = jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
344 int maxActionToBeCreated = coordJob.getMatThrottling() - numWaitingActions;
345 LOG.debug("Coordinator job :" + coordJob.getId() + ", maxActionToBeCreated :" + maxActionToBeCreated
346 + ", Mat_Throttle :" + coordJob.getMatThrottling() + ", numWaitingActions :" + numWaitingActions);
347
348 while (effStart.compareTo(end) < 0 && maxActionToBeCreated-- > 0) {
349 if (pause != null && effStart.compareTo(pause) >= 0) {
350 break;
351 }
352 CoordinatorActionBean actionBean = new CoordinatorActionBean();
353 lastActionNumber++;
354
355 int timeout = coordJob.getTimeout();
356 LOG.debug("Materializing action for time=" + effStart.getTime() + ", lastactionnumber=" + lastActionNumber
357 + " timeout=" + timeout + " minutes");
358 Date actualTime = new Date();
359 action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(),
360 effStart.getTime(), actualTime, lastActionNumber, jobConf, actionBean);
361 actionBean.setTimeOut(timeout);
362
363 if (!dryrun) {
364 storeToDB(actionBean, action); // Storing to table
365
366 }
367 else {
368 actionStrings.append("action for new instance");
369 actionStrings.append(action);
370 }
371 // Restore the original start time
372 effStart = (Calendar) origStart.clone();
373 effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency);
374 }
375
376 endMatdTime = new Date(effStart.getTimeInMillis());
377 if (!dryrun) {
378 return action;
379 }
380 else {
381 return actionStrings.toString();
382 }
383 }
384
385 private void storeToDB(CoordinatorActionBean actionBean, String actionXml) throws Exception {
386 LOG.debug("In storeToDB() coord action id = " + actionBean.getId() + ", size of actionXml = "
387 + actionXml.length());
388 actionBean.setActionXml(actionXml);
389
390 insertList.add(actionBean);
391 writeActionSlaRegistration(actionXml, actionBean);
392
393 // TODO: time 100s should be configurable
394 queue(new CoordActionNotificationXCommand(actionBean), 100);
395 queue(new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId()), 100);
396 }
397
398 private void writeActionSlaRegistration(String actionXml, CoordinatorActionBean actionBean) throws Exception {
399 Element eAction = XmlUtils.parseXml(actionXml);
400 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
401 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, coordJob
402 .getUser(), coordJob.getGroup(), LOG);
403 if(slaEvent != null) {
404 insertList.add(slaEvent);
405 }
406 // inserting into new table also
407 SLAOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), actionBean.getJobId(),
408 AppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getAppName(), LOG, false);
409 }
410
411 private void updateJobMaterializeInfo(CoordinatorJobBean job) throws CommandException {
412 job.setLastActionTime(endMatdTime);
413 job.setLastActionNumber(lastActionNumber);
414 // if the job endtime == action endtime, we don't need to materialize this job anymore
415 Date jobEndTime = job.getEndTime();
416
417
418 if (job.getStatus() == CoordinatorJob.Status.PREP){
419 LOG.info("[" + job.getId() + "]: Update status from " + job.getStatus() + " to RUNNING");
420 job.setStatus(Job.Status.RUNNING);
421 }
422 job.setPending();
423
424 if (jobEndTime.compareTo(endMatdTime) <= 0) {
425 LOG.info("[" + job.getId() + "]: all actions have been materialized, job status = " + job.getStatus()
426 + ", set pending to true");
427 // set doneMaterialization to true when materialization is done
428 job.setDoneMaterialization();
429 }
430 job.setStatus(StatusUtils.getStatus(job));
431 job.setNextMaterializedTime(endMatdTime);
432 }
433
434 /* (non-Javadoc)
435 * @see org.apache.oozie.command.XCommand#getKey()
436 */
437 @Override
438 public String getKey() {
439 return getName() + "_" + jobId;
440 }
441
442 /* (non-Javadoc)
443 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
444 */
445 @Override
446 public void notifyParent() throws CommandException {
447 // update bundle action only when status changes in coord job
448 if (this.coordJob.getBundleId() != null) {
449 if (!prevStatus.equals(coordJob.getStatus())) {
450 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
451 bundleStatusUpdate.call();
452 }
453 }
454 }
455 }