001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.sql.Timestamp; 023 import java.util.Calendar; 024 import java.util.Date; 025 import java.util.TimeZone; 026 027 import org.apache.hadoop.conf.Configuration; 028 import org.apache.oozie.AppType; 029 import org.apache.oozie.CoordinatorActionBean; 030 import org.apache.oozie.CoordinatorJobBean; 031 import org.apache.oozie.ErrorCode; 032 import org.apache.oozie.SLAEventBean; 033 import org.apache.oozie.client.CoordinatorJob; 034 import org.apache.oozie.client.Job; 035 import org.apache.oozie.client.SLAEvent.SlaAppType; 036 import org.apache.oozie.client.rest.JsonBean; 037 import org.apache.oozie.command.CommandException; 038 import org.apache.oozie.command.MaterializeTransitionXCommand; 039 import org.apache.oozie.command.PreconditionException; 040 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 041 import org.apache.oozie.coord.TimeUnit; 042 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 043 import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor; 044 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 045 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor; 046 import org.apache.oozie.executor.jpa.JPAExecutorException; 047 import org.apache.oozie.service.EventHandlerService; 048 import org.apache.oozie.service.JPAService; 049 import org.apache.oozie.service.Service; 050 import org.apache.oozie.service.Services; 051 import org.apache.oozie.util.DateUtils; 052 import org.apache.oozie.util.Instrumentation; 053 import org.apache.oozie.util.LogUtils; 054 import org.apache.oozie.util.ParamChecker; 055 import org.apache.oozie.sla.SLAOperations; 056 import org.apache.oozie.util.StatusUtils; 057 import org.apache.oozie.util.XConfiguration; 058 import org.apache.oozie.util.XmlUtils; 059 import org.apache.oozie.util.db.SLADbOperations; 060 import org.jdom.Element; 061 062 /** 063 * Materialize actions for specified start and end time for coordinator job. 064 */ 065 @SuppressWarnings("deprecation") 066 public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCommand { 067 private static final int LOOKAHEAD_WINDOW = 300; // We look ahead 5 minutes for materialization; 068 private JPAService jpaService = null; 069 private CoordinatorJobBean coordJob = null; 070 private String jobId = null; 071 private Date startMatdTime = null; 072 private Date endMatdTime = null; 073 private final int materializationWindow; 074 private int lastActionNumber = 1; // over-ride by DB value 075 private CoordinatorJob.Status prevStatus = null; 076 /** 077 * Default MAX timeout in minutes, after which coordinator input check will timeout 078 */ 079 public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout"; 080 081 /** 082 * The constructor for class {@link CoordMaterializeTransitionXCommand} 083 * 084 * @param jobId coordinator job id 085 * @param materializationWindow materialization window to calculate end time 086 */ 087 public CoordMaterializeTransitionXCommand(String jobId, int materializationWindow) { 088 super("coord_mater", "coord_mater", 1); 089 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 090 this.materializationWindow = materializationWindow; 091 } 092 093 /* (non-Javadoc) 094 * @see org.apache.oozie.command.MaterializeTransitionXCommand#transitToNext() 095 */ 096 @Override 097 public void transitToNext() throws CommandException { 098 } 099 100 /* (non-Javadoc) 101 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 102 */ 103 @Override 104 public void updateJob() throws CommandException { 105 updateList.add(coordJob); 106 } 107 108 /* (non-Javadoc) 109 * @see org.apache.oozie.command.MaterializeTransitionXCommand#performWrites() 110 */ 111 @Override 112 public void performWrites() throws CommandException { 113 try { 114 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList)); 115 // register the partition related dependencies of actions 116 for (JsonBean actionBean : insertList) { 117 if (actionBean instanceof CoordinatorActionBean) { 118 CoordinatorActionBean coordAction = (CoordinatorActionBean) actionBean; 119 if (EventHandlerService.isEnabled()) { 120 CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null); 121 } 122 if (coordAction.getPushMissingDependencies() != null) { 123 // TODO: Delay in catchup mode? 124 queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), 100); 125 } 126 } 127 } 128 } 129 catch (JPAExecutorException jex) { 130 throw new CommandException(jex); 131 } 132 } 133 134 /* (non-Javadoc) 135 * @see org.apache.oozie.command.XCommand#getEntityKey() 136 */ 137 @Override 138 public String getEntityKey() { 139 return this.jobId; 140 } 141 142 @Override 143 protected boolean isLockRequired() { 144 return true; 145 } 146 147 /* (non-Javadoc) 148 * @see org.apache.oozie.command.XCommand#loadState() 149 */ 150 @Override 151 protected void loadState() throws CommandException { 152 jpaService = Services.get().get(JPAService.class); 153 if (jpaService == null) { 154 LOG.error(ErrorCode.E0610); 155 } 156 157 try { 158 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 159 prevStatus = coordJob.getStatus(); 160 } 161 catch (JPAExecutorException jex) { 162 throw new CommandException(jex); 163 } 164 165 // calculate start materialize and end materialize time 166 calcMatdTime(); 167 168 LogUtils.setLogInfo(coordJob, logInfo); 169 } 170 171 /** 172 * Calculate startMatdTime and endMatdTime from job's start time if next materialized time is null 173 * 174 * @throws CommandException thrown if failed to calculate startMatdTime and endMatdTime 175 */ 176 protected void calcMatdTime() throws CommandException { 177 Timestamp startTime = coordJob.getNextMaterializedTimestamp(); 178 if (startTime == null) { 179 startTime = coordJob.getStartTimestamp(); 180 } 181 // calculate end time by adding materializationWindow to start time. 182 // need to convert materializationWindow from secs to milliseconds 183 long startTimeMilli = startTime.getTime(); 184 long endTimeMilli = startTimeMilli + (materializationWindow * 1000); 185 186 startMatdTime = DateUtils.toDate(new Timestamp(startTimeMilli)); 187 endMatdTime = DateUtils.toDate(new Timestamp(endTimeMilli)); 188 // if MaterializationWindow end time is greater than endTime 189 // for job, then set it to endTime of job 190 Date jobEndTime = coordJob.getEndTime(); 191 if (endMatdTime.compareTo(jobEndTime) > 0) { 192 endMatdTime = jobEndTime; 193 } 194 195 LOG.debug("Materializing coord job id=" + jobId + ", start=" + startMatdTime + ", end=" + endMatdTime 196 + ", window=" + materializationWindow); 197 } 198 199 /* (non-Javadoc) 200 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 201 */ 202 @Override 203 protected void verifyPrecondition() throws CommandException, PreconditionException { 204 if (!(coordJob.getStatus() == CoordinatorJobBean.Status.PREP || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNING 205 || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNINGWITHERROR)) { 206 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId 207 + " job is not in PREP or RUNNING but in " + coordJob.getStatus()); 208 } 209 210 if (coordJob.isDoneMaterialization()) { 211 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId =" + jobId 212 + " job is already materialized"); 213 } 214 215 if (coordJob.getNextMaterializedTimestamp() != null 216 && coordJob.getNextMaterializedTimestamp().compareTo(coordJob.getEndTimestamp()) >= 0) { 217 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId 218 + " job is already materialized"); 219 } 220 221 Timestamp startTime = coordJob.getNextMaterializedTimestamp(); 222 if (startTime == null) { 223 startTime = coordJob.getStartTimestamp(); 224 225 if (startTime.after(new Timestamp(System.currentTimeMillis() + LOOKAHEAD_WINDOW * 1000))) { 226 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" 227 + jobId + " job's start time is not reached yet - nothing to materialize"); 228 } 229 } 230 231 if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(coordJob.getEndTime()) >= 0) { 232 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId 233 + ", all actions have been materialized from start time = " + coordJob.getStartTime() 234 + " to end time = " + coordJob.getEndTime() + ", job status = " + coordJob.getStatusStr()); 235 } 236 237 if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(endMatdTime) >= 0) { 238 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId 239 + ", action is *already* materialized for Materialization start time = " + startMatdTime 240 + ", materialization end time = " + endMatdTime + ", job status = " + coordJob.getStatusStr()); 241 } 242 243 if (endMatdTime.after(coordJob.getEndTime())) { 244 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId 245 + " materialization end time = " + endMatdTime + " surpasses coordinator job's end time = " 246 + coordJob.getEndTime() + " job status = " + coordJob.getStatusStr()); 247 } 248 249 if (coordJob.getPauseTime() != null && !startMatdTime.before(coordJob.getPauseTime())) { 250 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId 251 + ", materialization start time = " + startMatdTime 252 + " is after or equal to coordinator job's pause time = " + coordJob.getPauseTime() 253 + ", job status = " + coordJob.getStatusStr()); 254 } 255 256 } 257 258 /* (non-Javadoc) 259 * @see org.apache.oozie.command.MaterializeTransitionXCommand#materialize() 260 */ 261 @Override 262 protected void materialize() throws CommandException { 263 Instrumentation.Cron cron = new Instrumentation.Cron(); 264 cron.start(); 265 try { 266 materializeActions(false); 267 updateJobMaterializeInfo(coordJob); 268 } 269 catch (CommandException ex) { 270 LOG.warn("Exception occurred:" + ex.getMessage() + " Making the job failed ", ex); 271 coordJob.setStatus(Job.Status.FAILED); 272 coordJob.resetPending(); 273 // remove any materialized actions and slaEvents 274 insertList.clear(); 275 } 276 catch (Exception e) { 277 LOG.error("Exception occurred:" + e.getMessage() + " Making the job failed ", e); 278 coordJob.setStatus(Job.Status.FAILED); 279 try { 280 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 281 } 282 catch (JPAExecutorException jex) { 283 throw new CommandException(ErrorCode.E1011, jex); 284 } 285 throw new CommandException(ErrorCode.E1012, e.getMessage(), e); 286 } 287 cron.stop(); 288 289 } 290 291 /** 292 * Create action instances starting from "startMatdTime" to "endMatdTime" and store them into coord action table. 293 * 294 * @param dryrun if this is a dry run 295 * @throws Exception thrown if failed to materialize actions 296 */ 297 protected String materializeActions(boolean dryrun) throws Exception { 298 299 Configuration jobConf = null; 300 try { 301 jobConf = new XConfiguration(new StringReader(coordJob.getConf())); 302 } 303 catch (IOException ioe) { 304 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe); 305 throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe); 306 } 307 308 String jobXml = coordJob.getJobXml(); 309 Element eJob = XmlUtils.parseXml(jobXml); 310 TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone()); 311 int frequency = Integer.valueOf(coordJob.getFrequency()); 312 TimeUnit freqTU = TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit")); 313 TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration")); 314 Calendar start = Calendar.getInstance(appTz); 315 start.setTime(startMatdTime); 316 DateUtils.moveToEnd(start, endOfFlag); 317 Calendar end = Calendar.getInstance(appTz); 318 end.setTime(endMatdTime); 319 lastActionNumber = coordJob.getLastActionNumber(); 320 LOG.info("materialize actions for tz=" + appTz.getDisplayName() + ",\n start=" + start.getTime() + ", end=" 321 + end.getTime() + ",\n timeUnit " + freqTU.getCalendarUnit() + ",\n frequency :" + frequency + ":" 322 + freqTU + ",\n lastActionNumber " + lastActionNumber); 323 // Keep the actual start time 324 Calendar origStart = Calendar.getInstance(appTz); 325 origStart.setTime(coordJob.getStartTimestamp()); 326 // Move to the End of duration, if needed. 327 DateUtils.moveToEnd(origStart, endOfFlag); 328 // Cloning the start time to be used in loop iteration 329 Calendar effStart = (Calendar) origStart.clone(); 330 // Move the time when the previous action finished 331 effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency); 332 333 StringBuilder actionStrings = new StringBuilder(); 334 Date jobPauseTime = coordJob.getPauseTime(); 335 Calendar pause = null; 336 if (jobPauseTime != null) { 337 pause = Calendar.getInstance(appTz); 338 pause.setTime(DateUtils.convertDateToTimestamp(jobPauseTime)); 339 } 340 341 String action = null; 342 JPAService jpaService = Services.get().get(JPAService.class); 343 int numWaitingActions = jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId())); 344 int maxActionToBeCreated = coordJob.getMatThrottling() - numWaitingActions; 345 LOG.debug("Coordinator job :" + coordJob.getId() + ", maxActionToBeCreated :" + maxActionToBeCreated 346 + ", Mat_Throttle :" + coordJob.getMatThrottling() + ", numWaitingActions :" + numWaitingActions); 347 348 while (effStart.compareTo(end) < 0 && maxActionToBeCreated-- > 0) { 349 if (pause != null && effStart.compareTo(pause) >= 0) { 350 break; 351 } 352 CoordinatorActionBean actionBean = new CoordinatorActionBean(); 353 lastActionNumber++; 354 355 int timeout = coordJob.getTimeout(); 356 LOG.debug("Materializing action for time=" + effStart.getTime() + ", lastactionnumber=" + lastActionNumber 357 + " timeout=" + timeout + " minutes"); 358 Date actualTime = new Date(); 359 action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), 360 effStart.getTime(), actualTime, lastActionNumber, jobConf, actionBean); 361 actionBean.setTimeOut(timeout); 362 363 if (!dryrun) { 364 storeToDB(actionBean, action); // Storing to table 365 366 } 367 else { 368 actionStrings.append("action for new instance"); 369 actionStrings.append(action); 370 } 371 // Restore the original start time 372 effStart = (Calendar) origStart.clone(); 373 effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency); 374 } 375 376 endMatdTime = new Date(effStart.getTimeInMillis()); 377 if (!dryrun) { 378 return action; 379 } 380 else { 381 return actionStrings.toString(); 382 } 383 } 384 385 private void storeToDB(CoordinatorActionBean actionBean, String actionXml) throws Exception { 386 LOG.debug("In storeToDB() coord action id = " + actionBean.getId() + ", size of actionXml = " 387 + actionXml.length()); 388 actionBean.setActionXml(actionXml); 389 390 insertList.add(actionBean); 391 writeActionSlaRegistration(actionXml, actionBean); 392 393 // TODO: time 100s should be configurable 394 queue(new CoordActionNotificationXCommand(actionBean), 100); 395 queue(new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId()), 100); 396 } 397 398 private void writeActionSlaRegistration(String actionXml, CoordinatorActionBean actionBean) throws Exception { 399 Element eAction = XmlUtils.parseXml(actionXml); 400 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")); 401 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, coordJob 402 .getUser(), coordJob.getGroup(), LOG); 403 if(slaEvent != null) { 404 insertList.add(slaEvent); 405 } 406 // inserting into new table also 407 SLAOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), actionBean.getJobId(), 408 AppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getAppName(), LOG, false); 409 } 410 411 private void updateJobMaterializeInfo(CoordinatorJobBean job) throws CommandException { 412 job.setLastActionTime(endMatdTime); 413 job.setLastActionNumber(lastActionNumber); 414 // if the job endtime == action endtime, we don't need to materialize this job anymore 415 Date jobEndTime = job.getEndTime(); 416 417 418 if (job.getStatus() == CoordinatorJob.Status.PREP){ 419 LOG.info("[" + job.getId() + "]: Update status from " + job.getStatus() + " to RUNNING"); 420 job.setStatus(Job.Status.RUNNING); 421 } 422 job.setPending(); 423 424 if (jobEndTime.compareTo(endMatdTime) <= 0) { 425 LOG.info("[" + job.getId() + "]: all actions have been materialized, job status = " + job.getStatus() 426 + ", set pending to true"); 427 // set doneMaterialization to true when materialization is done 428 job.setDoneMaterialization(); 429 } 430 job.setStatus(StatusUtils.getStatus(job)); 431 job.setNextMaterializedTime(endMatdTime); 432 } 433 434 /* (non-Javadoc) 435 * @see org.apache.oozie.command.XCommand#getKey() 436 */ 437 @Override 438 public String getKey() { 439 return getName() + "_" + jobId; 440 } 441 442 /* (non-Javadoc) 443 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 444 */ 445 @Override 446 public void notifyParent() throws CommandException { 447 // update bundle action only when status changes in coord job 448 if (this.coordJob.getBundleId() != null) { 449 if (!prevStatus.equals(coordJob.getStatus())) { 450 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 451 bundleStatusUpdate.call(); 452 } 453 } 454 } 455 }