001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.sql.Timestamp; 023 import java.util.Calendar; 024 import java.util.Date; 025 import java.util.TimeZone; 026 027 import org.apache.hadoop.conf.Configuration; 028 import org.apache.oozie.CoordinatorActionBean; 029 import org.apache.oozie.CoordinatorJobBean; 030 import org.apache.oozie.ErrorCode; 031 import org.apache.oozie.client.CoordinatorJob; 032 import org.apache.oozie.client.Job; 033 import org.apache.oozie.client.SLAEvent.SlaAppType; 034 import org.apache.oozie.command.CommandException; 035 import org.apache.oozie.command.MaterializeTransitionXCommand; 036 import org.apache.oozie.command.PreconditionException; 037 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 038 import org.apache.oozie.coord.TimeUnit; 039 import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor; 040 import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor; 041 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 042 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor; 043 import org.apache.oozie.executor.jpa.JPAExecutorException; 044 import org.apache.oozie.service.JPAService; 045 import org.apache.oozie.service.Service; 046 import org.apache.oozie.service.Services; 047 import org.apache.oozie.util.DateUtils; 048 import org.apache.oozie.util.Instrumentation; 049 import org.apache.oozie.util.LogUtils; 050 import org.apache.oozie.util.ParamChecker; 051 import org.apache.oozie.util.StatusUtils; 052 import org.apache.oozie.util.XConfiguration; 053 import org.apache.oozie.util.XmlUtils; 054 import org.apache.oozie.util.db.SLADbOperations; 055 import org.jdom.Element; 056 057 /** 058 * Materialize actions for specified start and end time for coordinator job. 059 */ 060 public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCommand { 061 private static final int LOOKAHEAD_WINDOW = 300; // We look ahead 5 minutes for materialization; 062 private JPAService jpaService = null; 063 private CoordinatorJobBean coordJob = null; 064 private String jobId = null; 065 private Date startMatdTime = null; 066 private Date endMatdTime = null; 067 private final int materializationWindow; 068 private int lastActionNumber = 1; // over-ride by DB value 069 private CoordinatorJob.Status prevStatus = null; 070 /** 071 * Default MAX timeout in minutes, after which coordinator input check will timeout 072 */ 073 public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout"; 074 075 /** 076 * The constructor for class {@link CoordMaterializeTransitionXCommand} 077 * 078 * @param jobId coordinator job id 079 * @param materializationWindow materialization window to calculate end time 080 */ 081 public CoordMaterializeTransitionXCommand(String jobId, int materializationWindow) { 082 super("coord_mater", "coord_mater", 1); 083 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 084 this.materializationWindow = materializationWindow; 085 } 086 087 /* (non-Javadoc) 088 * @see org.apache.oozie.command.MaterializeTransitionXCommand#transitToNext() 089 */ 090 @Override 091 public void transitToNext() throws CommandException { 092 } 093 094 /* (non-Javadoc) 095 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 096 */ 097 @Override 098 public void updateJob() throws CommandException { 099 try { 100 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 101 } 102 catch (JPAExecutorException jex) { 103 throw new CommandException(jex); 104 } 105 } 106 107 /* (non-Javadoc) 108 * @see org.apache.oozie.command.XCommand#getEntityKey() 109 */ 110 @Override 111 protected String getEntityKey() { 112 return jobId; 113 } 114 115 @Override 116 protected boolean isLockRequired() { 117 return true; 118 } 119 120 /* (non-Javadoc) 121 * @see org.apache.oozie.command.XCommand#loadState() 122 */ 123 @Override 124 protected void loadState() throws CommandException { 125 jpaService = Services.get().get(JPAService.class); 126 if (jpaService == null) { 127 LOG.error(ErrorCode.E0610); 128 } 129 130 try { 131 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 132 prevStatus = coordJob.getStatus(); 133 } 134 catch (JPAExecutorException jex) { 135 throw new CommandException(jex); 136 } 137 138 // calculate start materialize and end materialize time 139 calcMatdTime(); 140 141 LogUtils.setLogInfo(coordJob, logInfo); 142 } 143 144 /** 145 * Calculate startMatdTime and endMatdTime from job's start time if next materialized time is null 146 * 147 * @throws CommandException thrown if failed to calculate startMatdTime and endMatdTime 148 */ 149 protected void calcMatdTime() throws CommandException { 150 Timestamp startTime = coordJob.getNextMaterializedTimestamp(); 151 if (startTime == null) { 152 startTime = coordJob.getStartTimestamp(); 153 } 154 // calculate end time by adding materializationWindow to start time. 155 // need to convert materializationWindow from secs to milliseconds 156 long startTimeMilli = startTime.getTime(); 157 long endTimeMilli = startTimeMilli + (materializationWindow * 1000); 158 159 startMatdTime = DateUtils.toDate(new Timestamp(startTimeMilli)); 160 endMatdTime = DateUtils.toDate(new Timestamp(endTimeMilli)); 161 // if MaterializationWindow end time is greater than endTime 162 // for job, then set it to endTime of job 163 Date jobEndTime = coordJob.getEndTime(); 164 if (endMatdTime.compareTo(jobEndTime) > 0) { 165 endMatdTime = jobEndTime; 166 } 167 168 LOG.debug("Materializing coord job id=" + jobId + ", start=" + startMatdTime + ", end=" + endMatdTime 169 + ", window=" + materializationWindow); 170 } 171 172 /* (non-Javadoc) 173 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 174 */ 175 @Override 176 protected void verifyPrecondition() throws CommandException, PreconditionException { 177 if (!(coordJob.getStatus() == CoordinatorJobBean.Status.PREP || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNING)) { 178 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId 179 + " job is not in PREP or RUNNING but in " + coordJob.getStatus()); 180 } 181 182 if (coordJob.getNextMaterializedTimestamp() != null 183 && coordJob.getNextMaterializedTimestamp().compareTo(coordJob.getEndTimestamp()) >= 0) { 184 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId 185 + " job is already materialized"); 186 } 187 188 Timestamp startTime = coordJob.getNextMaterializedTimestamp(); 189 if (startTime == null) { 190 startTime = coordJob.getStartTimestamp(); 191 192 if (startTime.after(new Timestamp(System.currentTimeMillis() + LOOKAHEAD_WINDOW * 1000))) { 193 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" 194 + jobId + " job's start time is not reached yet - nothing to materialize"); 195 } 196 } 197 198 if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(coordJob.getEndTime()) >= 0) { 199 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId 200 + ", all actions have been materialized from start time = " + coordJob.getStartTime() 201 + " to end time = " + coordJob.getEndTime() + ", job status = " + coordJob.getStatusStr()); 202 } 203 204 if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(endMatdTime) >= 0) { 205 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId 206 + ", action is *already* materialized for Materialization start time = " + startMatdTime 207 + ", materialization end time = " + endMatdTime + ", job status = " + coordJob.getStatusStr()); 208 } 209 210 if (endMatdTime.after(coordJob.getEndTime())) { 211 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId 212 + " materialization end time = " + endMatdTime + " surpasses coordinator job's end time = " 213 + coordJob.getEndTime() + " job status = " + coordJob.getStatusStr()); 214 } 215 216 if (coordJob.getPauseTime() != null && !startMatdTime.before(coordJob.getPauseTime())) { 217 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId 218 + ", materialization start time = " + startMatdTime 219 + " is after or equal to coordinator job's pause time = " + coordJob.getPauseTime() 220 + ", job status = " + coordJob.getStatusStr()); 221 } 222 223 } 224 225 /* (non-Javadoc) 226 * @see org.apache.oozie.command.MaterializeTransitionXCommand#materialize() 227 */ 228 @Override 229 protected void materialize() throws CommandException { 230 Instrumentation.Cron cron = new Instrumentation.Cron(); 231 cron.start(); 232 try { 233 materializeActions(false); 234 updateJobMaterializeInfo(coordJob); 235 } 236 catch (CommandException ex) { 237 LOG.warn("Exception occurs:" + ex.getMessage() + " Making the job failed ", ex); 238 coordJob.setStatus(Job.Status.FAILED); 239 coordJob.resetPending(); 240 } 241 catch (Exception e) { 242 LOG.error("Excepion thrown :", e); 243 throw new CommandException(ErrorCode.E1001, e.getMessage(), e); 244 } 245 cron.stop(); 246 247 } 248 249 /** 250 * Create action instances starting from "startMatdTime" to "endMatdTime" and store them into coord action table. 251 * 252 * @param dryrun if this is a dry run 253 * @throws Exception thrown if failed to materialize actions 254 */ 255 protected String materializeActions(boolean dryrun) throws Exception { 256 257 Configuration jobConf = null; 258 try { 259 jobConf = new XConfiguration(new StringReader(coordJob.getConf())); 260 } 261 catch (IOException ioe) { 262 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe); 263 throw new CommandException(ErrorCode.E1005, ioe); 264 } 265 266 String jobXml = coordJob.getJobXml(); 267 Element eJob = XmlUtils.parseXml(jobXml); 268 TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone()); 269 int frequency = coordJob.getFrequency(); 270 TimeUnit freqTU = TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit")); 271 TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration")); 272 Calendar start = Calendar.getInstance(appTz); 273 start.setTime(startMatdTime); 274 DateUtils.moveToEnd(start, endOfFlag); 275 Calendar end = Calendar.getInstance(appTz); 276 end.setTime(endMatdTime); 277 lastActionNumber = coordJob.getLastActionNumber(); 278 LOG.info("materialize actions for tz=" + appTz.getDisplayName() + ",\n start=" + start.getTime() + ", end=" 279 + end.getTime() + ",\n timeUnit " + freqTU.getCalendarUnit() + ",\n frequency :" + frequency + ":" 280 + freqTU + ",\n lastActionNumber " + lastActionNumber); 281 // Keep the actual start time 282 Calendar origStart = Calendar.getInstance(appTz); 283 origStart.setTime(coordJob.getStartTimestamp()); 284 // Move to the End of duration, if needed. 285 DateUtils.moveToEnd(origStart, endOfFlag); 286 // Cloning the start time to be used in loop iteration 287 Calendar effStart = (Calendar) origStart.clone(); 288 // Move the time when the previous action finished 289 effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency); 290 291 StringBuilder actionStrings = new StringBuilder(); 292 Date jobPauseTime = coordJob.getPauseTime(); 293 Calendar pause = null; 294 if (jobPauseTime != null) { 295 pause = Calendar.getInstance(appTz); 296 pause.setTime(DateUtils.convertDateToTimestamp(jobPauseTime)); 297 } 298 299 String action = null; 300 JPAService jpaService = Services.get().get(JPAService.class); 301 int numWaitingActions = jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId())); 302 int maxActionToBeCreated = coordJob.getMatThrottling() - numWaitingActions; 303 LOG.debug("Coordinator job :" + coordJob.getId() + ", maxActionToBeCreated :" + maxActionToBeCreated 304 + ", Mat_Throttle :" + coordJob.getMatThrottling() + ", numWaitingActions :" + numWaitingActions); 305 306 while (effStart.compareTo(end) < 0 && maxActionToBeCreated-- > 0) { 307 if (pause != null && effStart.compareTo(pause) >= 0) { 308 break; 309 } 310 CoordinatorActionBean actionBean = new CoordinatorActionBean(); 311 lastActionNumber++; 312 313 int timeout = coordJob.getTimeout(); 314 LOG.debug("Materializing action for time=" + effStart.getTime() + ", lastactionnumber=" + lastActionNumber 315 + " timeout=" + timeout + " minutes"); 316 Date actualTime = new Date(); 317 action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), 318 effStart.getTime(), actualTime, lastActionNumber, jobConf, actionBean); 319 actionBean.setTimeOut(timeout); 320 321 if (!dryrun) { 322 storeToDB(actionBean, action); // Storing to table 323 } 324 else { 325 actionStrings.append("action for new instance"); 326 actionStrings.append(action); 327 } 328 // Restore the original start time 329 effStart = (Calendar) origStart.clone(); 330 effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency); 331 } 332 333 endMatdTime = new Date(effStart.getTimeInMillis()); 334 if (!dryrun) { 335 return action; 336 } 337 else { 338 return actionStrings.toString(); 339 } 340 } 341 342 private void storeToDB(CoordinatorActionBean actionBean, String actionXml) throws Exception { 343 LOG.debug("In storeToDB() coord action id = " + actionBean.getId() + ", size of actionXml = " 344 + actionXml.length()); 345 actionBean.setActionXml(actionXml); 346 347 jpaService.execute(new CoordActionInsertJPAExecutor(actionBean)); 348 writeActionRegistration(actionXml, actionBean); 349 350 // TODO: time 100s should be configurable 351 queue(new CoordActionNotificationXCommand(actionBean), 100); 352 queue(new CoordActionInputCheckXCommand(actionBean.getId()), 100); 353 } 354 355 private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean) throws Exception { 356 Element eAction = XmlUtils.parseXml(actionXml); 357 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")); 358 SLADbOperations.writeSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, coordJob 359 .getUser(), coordJob.getGroup(), LOG); 360 } 361 362 private void updateJobMaterializeInfo(CoordinatorJobBean job) throws CommandException { 363 job.setLastActionTime(endMatdTime); 364 job.setLastActionNumber(lastActionNumber); 365 // if the job endtime == action endtime, we don't need to materialize this job anymore 366 Date jobEndTime = job.getEndTime(); 367 368 LOG.info("[" + job.getId() + "]: Update status from " + job.getStatus() + " to RUNNING"); 369 job.setStatus(Job.Status.RUNNING); 370 job.setPending(); 371 372 if (jobEndTime.compareTo(endMatdTime) <= 0) { 373 LOG.info("[" + job.getId() + "]: all actions have been materialized, job status = " + job.getStatus() 374 + ", set pending to true"); 375 // set doneMaterialization to true when materialization is done 376 job.setDoneMaterialization(); 377 } 378 job.setStatus(StatusUtils.getStatus(job)); 379 job.setNextMaterializedTime(endMatdTime); 380 } 381 382 /* (non-Javadoc) 383 * @see org.apache.oozie.command.XCommand#getKey() 384 */ 385 @Override 386 public String getKey() { 387 return getName() + "_" + jobId; 388 } 389 390 /* (non-Javadoc) 391 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 392 */ 393 @Override 394 public void notifyParent() throws CommandException { 395 // update bundle action only when status changes in coord job 396 if (this.coordJob.getBundleId() != null) { 397 if (!prevStatus.equals(coordJob.getStatus())) { 398 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 399 bundleStatusUpdate.call(); 400 } 401 } 402 } 403 }