001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.sql.Timestamp; 023 import java.util.Calendar; 024 import java.util.Date; 025 import java.util.TimeZone; 026 027 import org.apache.hadoop.conf.Configuration; 028 import org.apache.oozie.CoordinatorActionBean; 029 import org.apache.oozie.CoordinatorJobBean; 030 import org.apache.oozie.ErrorCode; 031 import org.apache.oozie.SLAEventBean; 032 import org.apache.oozie.client.CoordinatorJob; 033 import org.apache.oozie.client.Job; 034 import org.apache.oozie.client.SLAEvent.SlaAppType; 035 import org.apache.oozie.command.CommandException; 036 import org.apache.oozie.command.MaterializeTransitionXCommand; 037 import org.apache.oozie.command.PreconditionException; 038 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 039 import org.apache.oozie.coord.TimeUnit; 040 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 041 import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor; 042 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 043 import org.apache.oozie.executor.jpa.JPAExecutorException; 044 import org.apache.oozie.service.JPAService; 045 import org.apache.oozie.service.Service; 046 import org.apache.oozie.service.Services; 047 import org.apache.oozie.util.DateUtils; 048 import org.apache.oozie.util.Instrumentation; 049 import org.apache.oozie.util.LogUtils; 050 import org.apache.oozie.util.ParamChecker; 051 import org.apache.oozie.util.StatusUtils; 052 import org.apache.oozie.util.XConfiguration; 053 import org.apache.oozie.util.XmlUtils; 054 import org.apache.oozie.util.db.SLADbOperations; 055 import org.jdom.Element; 056 057 /** 058 * Materialize actions for specified start and end time for coordinator job. 059 */ 060 public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCommand { 061 private static final int LOOKAHEAD_WINDOW = 300; // We look ahead 5 minutes for materialization; 062 private JPAService jpaService = null; 063 private CoordinatorJobBean coordJob = null; 064 private String jobId = null; 065 private Date startMatdTime = null; 066 private Date endMatdTime = null; 067 private final int materializationWindow; 068 private int lastActionNumber = 1; // over-ride by DB value 069 private CoordinatorJob.Status prevStatus = null; 070 /** 071 * Default MAX timeout in minutes, after which coordinator input check will timeout 072 */ 073 public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout"; 074 075 /** 076 * The constructor for class {@link CoordMaterializeTransitionXCommand} 077 * 078 * @param jobId coordinator job id 079 * @param materializationWindow materialization window to calculate end time 080 */ 081 public CoordMaterializeTransitionXCommand(String jobId, int materializationWindow) { 082 super("coord_mater", "coord_mater", 1); 083 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 084 this.materializationWindow = materializationWindow; 085 } 086 087 /* (non-Javadoc) 088 * @see org.apache.oozie.command.MaterializeTransitionXCommand#transitToNext() 089 */ 090 @Override 091 public void transitToNext() throws CommandException { 092 } 093 094 /* (non-Javadoc) 095 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 096 */ 097 @Override 098 public void updateJob() throws CommandException { 099 updateList.add(coordJob); 100 } 101 102 /* (non-Javadoc) 103 * @see org.apache.oozie.command.MaterializeTransitionXCommand#performWrites() 104 */ 105 @Override 106 public void performWrites() throws CommandException { 107 try { 108 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList)); 109 } 110 catch (JPAExecutorException jex) { 111 throw new CommandException(jex); 112 } 113 } 114 115 /* (non-Javadoc) 116 * @see org.apache.oozie.command.XCommand#getEntityKey() 117 */ 118 @Override 119 public String getEntityKey() { 120 return this.jobId; 121 } 122 123 @Override 124 protected boolean isLockRequired() { 125 return true; 126 } 127 128 /* (non-Javadoc) 129 * @see org.apache.oozie.command.XCommand#loadState() 130 */ 131 @Override 132 protected void loadState() throws CommandException { 133 jpaService = Services.get().get(JPAService.class); 134 if (jpaService == null) { 135 LOG.error(ErrorCode.E0610); 136 } 137 138 try { 139 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 140 prevStatus = coordJob.getStatus(); 141 } 142 catch (JPAExecutorException jex) { 143 throw new CommandException(jex); 144 } 145 146 // calculate start materialize and end materialize time 147 calcMatdTime(); 148 149 LogUtils.setLogInfo(coordJob, logInfo); 150 } 151 152 /** 153 * Calculate startMatdTime and endMatdTime from job's start time if next materialized time is null 154 * 155 * @throws CommandException thrown if failed to calculate startMatdTime and endMatdTime 156 */ 157 protected void calcMatdTime() throws CommandException { 158 Timestamp startTime = coordJob.getNextMaterializedTimestamp(); 159 if (startTime == null) { 160 startTime = coordJob.getStartTimestamp(); 161 } 162 // calculate end time by adding materializationWindow to start time. 163 // need to convert materializationWindow from secs to milliseconds 164 long startTimeMilli = startTime.getTime(); 165 long endTimeMilli = startTimeMilli + (materializationWindow * 1000); 166 167 startMatdTime = DateUtils.toDate(new Timestamp(startTimeMilli)); 168 endMatdTime = DateUtils.toDate(new Timestamp(endTimeMilli)); 169 // if MaterializationWindow end time is greater than endTime 170 // for job, then set it to endTime of job 171 Date jobEndTime = coordJob.getEndTime(); 172 if (endMatdTime.compareTo(jobEndTime) > 0) { 173 endMatdTime = jobEndTime; 174 } 175 176 LOG.debug("Materializing coord job id=" + jobId + ", start=" + startMatdTime + ", end=" + endMatdTime 177 + ", window=" + materializationWindow); 178 } 179 180 /* (non-Javadoc) 181 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 182 */ 183 @Override 184 protected void verifyPrecondition() throws CommandException, PreconditionException { 185 if (!(coordJob.getStatus() == CoordinatorJobBean.Status.PREP || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNING 186 || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNINGWITHERROR)) { 187 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId 188 + " job is not in PREP or RUNNING but in " + coordJob.getStatus()); 189 } 190 191 if (coordJob.getNextMaterializedTimestamp() != null 192 && coordJob.getNextMaterializedTimestamp().compareTo(coordJob.getEndTimestamp()) >= 0) { 193 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId 194 + " job is already materialized"); 195 } 196 197 Timestamp startTime = coordJob.getNextMaterializedTimestamp(); 198 if (startTime == null) { 199 startTime = coordJob.getStartTimestamp(); 200 201 if (startTime.after(new Timestamp(System.currentTimeMillis() + LOOKAHEAD_WINDOW * 1000))) { 202 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" 203 + jobId + " job's start time is not reached yet - nothing to materialize"); 204 } 205 } 206 207 if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(coordJob.getEndTime()) >= 0) { 208 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId 209 + ", all actions have been materialized from start time = " + coordJob.getStartTime() 210 + " to end time = " + coordJob.getEndTime() + ", job status = " + coordJob.getStatusStr()); 211 } 212 213 if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(endMatdTime) >= 0) { 214 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId 215 + ", action is *already* materialized for Materialization start time = " + startMatdTime 216 + ", materialization end time = " + endMatdTime + ", job status = " + coordJob.getStatusStr()); 217 } 218 219 if (endMatdTime.after(coordJob.getEndTime())) { 220 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId 221 + " materialization end time = " + endMatdTime + " surpasses coordinator job's end time = " 222 + coordJob.getEndTime() + " job status = " + coordJob.getStatusStr()); 223 } 224 225 if (coordJob.getPauseTime() != null && !startMatdTime.before(coordJob.getPauseTime())) { 226 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId 227 + ", materialization start time = " + startMatdTime 228 + " is after or equal to coordinator job's pause time = " + coordJob.getPauseTime() 229 + ", job status = " + coordJob.getStatusStr()); 230 } 231 232 } 233 234 /* (non-Javadoc) 235 * @see org.apache.oozie.command.MaterializeTransitionXCommand#materialize() 236 */ 237 @Override 238 protected void materialize() throws CommandException { 239 Instrumentation.Cron cron = new Instrumentation.Cron(); 240 cron.start(); 241 try { 242 materializeActions(false); 243 updateJobMaterializeInfo(coordJob); 244 } 245 catch (CommandException ex) { 246 LOG.warn("Exception occurs:" + ex.getMessage() + " Making the job failed ", ex); 247 coordJob.setStatus(Job.Status.FAILED); 248 coordJob.resetPending(); 249 } 250 catch (Exception e) { 251 LOG.error("Excepion thrown :", e); 252 throw new CommandException(ErrorCode.E1001, e.getMessage(), e); 253 } 254 cron.stop(); 255 256 } 257 258 /** 259 * Create action instances starting from "startMatdTime" to "endMatdTime" and store them into coord action table. 260 * 261 * @param dryrun if this is a dry run 262 * @throws Exception thrown if failed to materialize actions 263 */ 264 protected String materializeActions(boolean dryrun) throws Exception { 265 266 Configuration jobConf = null; 267 try { 268 jobConf = new XConfiguration(new StringReader(coordJob.getConf())); 269 } 270 catch (IOException ioe) { 271 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe); 272 throw new CommandException(ErrorCode.E1005, ioe); 273 } 274 275 String jobXml = coordJob.getJobXml(); 276 Element eJob = XmlUtils.parseXml(jobXml); 277 TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone()); 278 int frequency = coordJob.getFrequency(); 279 TimeUnit freqTU = TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit")); 280 TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration")); 281 Calendar start = Calendar.getInstance(appTz); 282 start.setTime(startMatdTime); 283 DateUtils.moveToEnd(start, endOfFlag); 284 Calendar end = Calendar.getInstance(appTz); 285 end.setTime(endMatdTime); 286 lastActionNumber = coordJob.getLastActionNumber(); 287 LOG.info("materialize actions for tz=" + appTz.getDisplayName() + ",\n start=" + start.getTime() + ", end=" 288 + end.getTime() + ",\n timeUnit " + freqTU.getCalendarUnit() + ",\n frequency :" + frequency + ":" 289 + freqTU + ",\n lastActionNumber " + lastActionNumber); 290 // Keep the actual start time 291 Calendar origStart = Calendar.getInstance(appTz); 292 origStart.setTime(coordJob.getStartTimestamp()); 293 // Move to the End of duration, if needed. 294 DateUtils.moveToEnd(origStart, endOfFlag); 295 // Cloning the start time to be used in loop iteration 296 Calendar effStart = (Calendar) origStart.clone(); 297 // Move the time when the previous action finished 298 effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency); 299 300 StringBuilder actionStrings = new StringBuilder(); 301 Date jobPauseTime = coordJob.getPauseTime(); 302 Calendar pause = null; 303 if (jobPauseTime != null) { 304 pause = Calendar.getInstance(appTz); 305 pause.setTime(DateUtils.convertDateToTimestamp(jobPauseTime)); 306 } 307 308 String action = null; 309 JPAService jpaService = Services.get().get(JPAService.class); 310 int numWaitingActions = jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId())); 311 int maxActionToBeCreated = coordJob.getMatThrottling() - numWaitingActions; 312 LOG.debug("Coordinator job :" + coordJob.getId() + ", maxActionToBeCreated :" + maxActionToBeCreated 313 + ", Mat_Throttle :" + coordJob.getMatThrottling() + ", numWaitingActions :" + numWaitingActions); 314 315 while (effStart.compareTo(end) < 0 && maxActionToBeCreated-- > 0) { 316 if (pause != null && effStart.compareTo(pause) >= 0) { 317 break; 318 } 319 CoordinatorActionBean actionBean = new CoordinatorActionBean(); 320 lastActionNumber++; 321 322 int timeout = coordJob.getTimeout(); 323 LOG.debug("Materializing action for time=" + effStart.getTime() + ", lastactionnumber=" + lastActionNumber 324 + " timeout=" + timeout + " minutes"); 325 Date actualTime = new Date(); 326 action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), 327 effStart.getTime(), actualTime, lastActionNumber, jobConf, actionBean); 328 actionBean.setTimeOut(timeout); 329 330 if (!dryrun) { 331 storeToDB(actionBean, action); // Storing to table 332 } 333 else { 334 actionStrings.append("action for new instance"); 335 actionStrings.append(action); 336 } 337 // Restore the original start time 338 effStart = (Calendar) origStart.clone(); 339 effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency); 340 } 341 342 endMatdTime = new Date(effStart.getTimeInMillis()); 343 if (!dryrun) { 344 return action; 345 } 346 else { 347 return actionStrings.toString(); 348 } 349 } 350 351 private void storeToDB(CoordinatorActionBean actionBean, String actionXml) throws Exception { 352 LOG.debug("In storeToDB() coord action id = " + actionBean.getId() + ", size of actionXml = " 353 + actionXml.length()); 354 actionBean.setActionXml(actionXml); 355 356 insertList.add(actionBean); 357 writeActionRegistration(actionXml, actionBean); 358 359 // TODO: time 100s should be configurable 360 queue(new CoordActionNotificationXCommand(actionBean), 100); 361 queue(new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId()), 100); 362 } 363 364 private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean) throws Exception { 365 Element eAction = XmlUtils.parseXml(actionXml); 366 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")); 367 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, coordJob 368 .getUser(), coordJob.getGroup(), LOG); 369 if(slaEvent != null) { 370 insertList.add(slaEvent); 371 } 372 } 373 374 private void updateJobMaterializeInfo(CoordinatorJobBean job) throws CommandException { 375 job.setLastActionTime(endMatdTime); 376 job.setLastActionNumber(lastActionNumber); 377 // if the job endtime == action endtime, we don't need to materialize this job anymore 378 Date jobEndTime = job.getEndTime(); 379 380 381 if (job.getStatus() == CoordinatorJob.Status.PREP){ 382 LOG.info("[" + job.getId() + "]: Update status from " + job.getStatus() + " to RUNNING"); 383 job.setStatus(Job.Status.RUNNING); 384 } 385 job.setPending(); 386 387 if (jobEndTime.compareTo(endMatdTime) <= 0) { 388 LOG.info("[" + job.getId() + "]: all actions have been materialized, job status = " + job.getStatus() 389 + ", set pending to true"); 390 // set doneMaterialization to true when materialization is done 391 job.setDoneMaterialization(); 392 } 393 job.setStatus(StatusUtils.getStatus(job)); 394 job.setNextMaterializedTime(endMatdTime); 395 } 396 397 /* (non-Javadoc) 398 * @see org.apache.oozie.command.XCommand#getKey() 399 */ 400 @Override 401 public String getKey() { 402 return getName() + "_" + jobId; 403 } 404 405 /* (non-Javadoc) 406 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 407 */ 408 @Override 409 public void notifyParent() throws CommandException { 410 // update bundle action only when status changes in coord job 411 if (this.coordJob.getBundleId() != null) { 412 if (!prevStatus.equals(coordJob.getStatus())) { 413 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 414 bundleStatusUpdate.call(); 415 } 416 } 417 } 418 }