001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.coord; 019 020import java.io.IOException; 021import java.io.StringReader; 022import java.util.ArrayList; 023import java.util.Calendar; 024import java.util.Date; 025import java.util.List; 026import java.util.TimeZone; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.oozie.AppType; 030import org.apache.oozie.CoordinatorActionBean; 031import org.apache.oozie.CoordinatorJobBean; 032import org.apache.oozie.ErrorCode; 033import org.apache.oozie.SLAEventBean; 034import org.apache.oozie.client.CoordinatorJob; 035import org.apache.oozie.client.SLAEvent.SlaAppType; 036import org.apache.oozie.client.rest.JsonBean; 037import org.apache.oozie.command.CommandException; 038import org.apache.oozie.coord.TimeUnit; 039import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 040import org.apache.oozie.executor.jpa.BatchQueryExecutor; 041import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 042import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 043import org.apache.oozie.executor.jpa.JPAExecutorException; 044import org.apache.oozie.service.JPAService; 045import org.apache.oozie.service.Service; 046import org.apache.oozie.service.Services; 047import org.apache.oozie.store.CoordinatorStore; 048import org.apache.oozie.store.StoreException; 049import org.apache.oozie.util.DateUtils; 050import org.apache.oozie.util.Instrumentation; 051import org.apache.oozie.sla.SLAOperations; 052import org.apache.oozie.util.XConfiguration; 053import org.apache.oozie.util.XLog; 054import org.apache.oozie.util.XmlUtils; 055import org.apache.oozie.util.db.SLADbOperations; 056import org.jdom.Element; 057 058@SuppressWarnings("deprecation") 059public class CoordActionMaterializeCommand extends CoordinatorCommand<Void> { 060 private String jobId; 061 private Date startTime; 062 private Date endTime; 063 private int lastActionNumber = 1; // over-ride by DB value 064 private final XLog log = XLog.getLog(getClass()); 065 private String user; 066 private String group; 067 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 068 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 069 070 /** 071 * Default timeout for catchup jobs, in minutes, after which coordinator input check will timeout 072 */ 073 public static final String CONF_DEFAULT_TIMEOUT_CATCHUP = Service.CONF_PREFIX + "coord.catchup.default.timeout"; 074 075 public CoordActionMaterializeCommand(String jobId, Date startTime, Date endTime) { 076 super("coord_action_mater", "coord_action_mater", 1, XLog.STD, false); 077 this.jobId = jobId; 078 this.startTime = startTime; 079 this.endTime = endTime; 080 } 081 082 @Override 083 protected Void call(CoordinatorStore store) throws CommandException { 084 CoordJobGetJPAExecutor getCoordJob = new CoordJobGetJPAExecutor(jobId); 085 CoordinatorJobBean job; 086 try { 087 job = Services.get().get(JPAService.class).execute(getCoordJob); 088 } 089 catch (JPAExecutorException jex) { 090 throw new CommandException(jex); 091 } 092 setLogInfo(job); 093 if (job.getLastActionTime() != null && job.getLastActionTime().compareTo(endTime) >= 0) { 094 log.info("ENDED Coordinator materialization for jobId = " + jobId 095 + " Action is *already* materialized for Materialization start time = " + startTime + " : Materialization end time = " + endTime + " Job status = " + job.getStatusStr()); 096 return null; 097 } 098 099 if (endTime.after(job.getEndTime())) { 100 log.info("ENDED Coordinator materialization for jobId = " + jobId + " Materialization end time = " + endTime 101 + " surpasses coordinator job's end time = " + job.getEndTime() + " Job status = " + job.getStatusStr()); 102 return null; 103 } 104 105 if (job.getPauseTime() != null && !startTime.before(job.getPauseTime())) { 106 log.info("ENDED Coordinator materialization for jobId = " + jobId + " Materialization start time = " + startTime 107 + " is after or equal to coordinator job's pause time = " + job.getPauseTime() + " Job status = " + job.getStatusStr()); 108 // pausetime blocks real materialization - we change job's status back to RUNNING; 109 if (job.getStatus() == CoordinatorJob.Status.PREMATER) { 110 job.setStatus(CoordinatorJob.Status.RUNNING); 111 } 112 updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS, job)); 113 return null; 114 } 115 116 this.user = job.getUser(); 117 this.group = job.getGroup(); 118 119 if (job.getStatus().equals(CoordinatorJobBean.Status.PREMATER)) { 120 Configuration jobConf = null; 121 log.debug("start job :" + jobId + " Materialization "); 122 try { 123 jobConf = new XConfiguration(new StringReader(job.getConf())); 124 } 125 catch (IOException ioe) { 126 log.warn("Configuration parse error. read from DB :" + job.getConf(), ioe); 127 throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe); 128 } 129 130 try { 131 materializeJobs(false, job, jobConf, store); 132 updateJobTable(job, store); 133 } 134 catch (CommandException ex) { 135 log.warn("Exception occurs:" + ex + " Making the job failed "); 136 job.setStatus(CoordinatorJobBean.Status.FAILED); 137 updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE, job)); 138 } 139 catch (Exception e) { 140 log.error("Excepion thrown :", e); 141 throw new CommandException(ErrorCode.E1001, e.getMessage(), e); 142 } 143 } 144 else { 145 log.info("WARN: action is not in PREMATER state! It's in state=" + job.getStatus()); 146 } 147 return null; 148 } 149 150 /** 151 * Create action instances starting from "start-time" to end-time" and store them into Action table. 152 * 153 * @param dryrun 154 * @param jobBean 155 * @param conf 156 * @param store 157 * @throws Exception 158 */ 159 protected String materializeJobs(boolean dryrun, CoordinatorJobBean jobBean, Configuration conf, 160 CoordinatorStore store) throws Exception { 161 String jobXml = jobBean.getJobXml(); 162 Element eJob = XmlUtils.parseXml(jobXml); 163 // TODO: always UTC? 164 TimeZone appTz = DateUtils.getTimeZone(jobBean.getTimeZone()); 165 // TimeZone appTz = DateUtils.getTimeZone("UTC"); 166 int frequency = Integer.valueOf(jobBean.getFrequency()); 167 TimeUnit freqTU = TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit")); 168 TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration")); 169 Calendar start = Calendar.getInstance(appTz); 170 start.setTime(startTime); 171 DateUtils.moveToEnd(start, endOfFlag); 172 Calendar end = Calendar.getInstance(appTz); 173 end.setTime(endTime); 174 lastActionNumber = jobBean.getLastActionNumber(); 175 // DateUtils.moveToEnd(end, endOfFlag); 176 log.info(" *** materialize Actions for tz=" + appTz.getDisplayName() + ",\n start=" + start.getTime() 177 + ", end=" + end.getTime() + "\n TimeUNIT " + freqTU.getCalendarUnit() + " Frequency :" + frequency 178 + ":" + freqTU + " lastActionNumber " + lastActionNumber); 179 // Keep the actual start time 180 Calendar origStart = Calendar.getInstance(appTz); 181 origStart.setTime(jobBean.getStartTimestamp()); 182 // Move to the End of duration, if needed. 183 DateUtils.moveToEnd(origStart, endOfFlag); 184 // Cloning the start time to be used in loop iteration 185 Calendar effStart = (Calendar) origStart.clone(); 186 // Move the time when the previous action finished 187 effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency); 188 189 String action = null; 190 StringBuilder actionStrings = new StringBuilder(); 191 Date jobPauseTime = jobBean.getPauseTime(); 192 Calendar pause = null; 193 if (jobPauseTime != null) { 194 pause = Calendar.getInstance(appTz); 195 pause.setTime(DateUtils.convertDateToTimestamp(jobPauseTime)); 196 } 197 198 while (effStart.compareTo(end) < 0) { 199 if (pause != null && effStart.compareTo(pause) >= 0) { 200 break; 201 } 202 CoordinatorActionBean actionBean = new CoordinatorActionBean(); 203 lastActionNumber++; 204 205 int timeout = jobBean.getTimeout(); 206 log.debug(origStart.getTime() + " Materializing action for time=" + effStart.getTime() 207 + ", lastactionnumber=" + lastActionNumber); 208 Date actualTime = new Date(); 209 action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), 210 effStart.getTime(), actualTime, lastActionNumber, conf, actionBean); 211 int catchUpTOMultiplier = 1; // This value might be could be changed in future 212 if (actionBean.getNominalTimestamp().before(jobBean.getCreatedTimestamp())) { 213 // Catchup action 214 timeout = catchUpTOMultiplier * timeout; 215 // actionBean.setTimeOut(Services.get().getConf().getInt(CONF_DEFAULT_TIMEOUT_CATCHUP, 216 // -1)); 217 log.info("Catchup timeout is :" + actionBean.getTimeOut()); 218 } 219 actionBean.setTimeOut(timeout); 220 221 if (!dryrun) { 222 storeToDB(actionBean, action, store, jobBean.getAppName()); // Storing to table 223 } 224 else { 225 actionStrings.append("action for new instance"); 226 actionStrings.append(action); 227 } 228 // Restore the original start time 229 effStart = (Calendar) origStart.clone(); 230 effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency); 231 } 232 233 endTime = new Date(effStart.getTimeInMillis()); 234 if (!dryrun) { 235 return action; 236 } 237 else { 238 return actionStrings.toString(); 239 } 240 } 241 242 /** 243 * Store an Action into database table. 244 * 245 * @param actionBean 246 * @param actionXml 247 * @param store 248 * @param appName 249 * @throws Exception 250 */ 251 private void storeToDB(CoordinatorActionBean actionBean, String actionXml, CoordinatorStore store, String appName) 252 throws Exception { 253 log.debug("In storeToDB() action Id " + actionBean.getId() + " Size of actionXml " + actionXml.length()); 254 actionBean.setActionXml(actionXml); 255 insertList.add(actionBean); 256 createActionRegistration(actionXml, actionBean, store, appName); 257 258 // TODO: time 100s should be configurable 259 queueCallable(new CoordActionNotificationXCommand(actionBean), 100); 260 queueCallable(new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId()), 100); 261 } 262 263 /** 264 * @param actionXml 265 * @param actionBean 266 * @param store 267 * @param appName 268 * @throws Exception 269 */ 270 private void createActionRegistration(String actionXml, CoordinatorActionBean actionBean, CoordinatorStore store, 271 String appName) throws Exception { 272 Element eAction = XmlUtils.parseXml(actionXml); 273 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")); 274 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, store, actionBean.getId(), 275 SlaAppType.COORDINATOR_ACTION, user, group); 276 if(slaEvent != null) { 277 insertList.add(slaEvent); 278 } 279 // insert into new sla reg table too 280 SLAOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), actionBean.getJobId(), 281 AppType.COORDINATOR_ACTION, user, appName, log, false); 282 } 283 284 /** 285 * @param job 286 * @param store 287 * @throws StoreException 288 */ 289 private void updateJobTable(CoordinatorJobBean job, CoordinatorStore store) { 290 // TODO: why do we need this? Isn't lastMatTime enough??? 291 job.setLastActionTime(endTime); 292 job.setLastActionNumber(lastActionNumber); 293 // if the job endtime == action endtime, then set status of job to 294 // succeeded 295 // we dont need to materialize this job anymore 296 Date jobEndTime = job.getEndTime(); 297 if (jobEndTime.compareTo(endTime) <= 0) { 298 job.setStatus(CoordinatorJob.Status.SUCCEEDED); 299 log.info("[" + job.getId() + "]: Update status from PREMATER to SUCCEEDED"); 300 } 301 else { 302 job.setStatus(CoordinatorJob.Status.RUNNING); 303 log.info("[" + job.getId() + "]: Update status from PREMATER to RUNNING"); 304 } 305 job.setNextMaterializedTime(endTime); 306 updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE, job)); 307 } 308 309 @Override 310 protected Void execute(CoordinatorStore store) throws StoreException, CommandException { 311 log.info("STARTED CoordActionMaterializeCommand for jobId=" + jobId + ", startTime=" + startTime + ", endTime=" 312 + endTime); 313 try { 314 if (lock(jobId)) { 315 call(store); 316 JPAService jpaService = Services.get().get(JPAService.class); 317 if (jpaService != null) { 318 try { 319 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 320 } 321 catch (JPAExecutorException je) { 322 throw new CommandException(je); 323 } 324 } 325 else { 326 throw new CommandException(ErrorCode.E0610); 327 } 328 } 329 else { 330 queueCallable(new CoordActionMaterializeCommand(jobId, startTime, endTime), 331 LOCK_FAILURE_REQUEUE_INTERVAL); 332 log.warn("CoordActionMaterializeCommand lock was not acquired - failed jobId=" + jobId 333 + ". Requeing the same."); 334 } 335 } 336 catch (InterruptedException e) { 337 queueCallable(new CoordActionMaterializeCommand(jobId, startTime, endTime), LOCK_FAILURE_REQUEUE_INTERVAL); 338 log.warn("CoordActionMaterializeCommand lock acquiring failed with exception " + e.getMessage() 339 + " for jobId=" + jobId + " Requeing the same."); 340 } 341 finally { 342 log.info(" ENDED CoordActionMaterializeCommand for jobId=" + jobId + ", startTime=" + startTime 343 + ", endTime=" + endTime); 344 } 345 return null; 346 } 347 348 349 350 /** 351 * For preliminery testing. Should be removed soon 352 * 353 * @param args 354 * @throws Exception 355 */ 356 public static void main(String[] args) throws Exception { 357 new Services().init(); 358 try { 359 Date startTime = DateUtils.parseDateUTC("2009-02-01T01:00Z"); 360 Date endTime = DateUtils.parseDateUTC("2009-02-02T01:00Z"); 361 String jobId = "0000000-091207151850551-oozie-dani-C"; 362 CoordActionMaterializeCommand matCmd = new CoordActionMaterializeCommand(jobId, startTime, endTime); 363 matCmd.call(); 364 } 365 finally { 366 try { 367 Thread.sleep(60000); 368 } 369 catch (Exception ex) { 370 } 371 new Services().destroy(); 372 } 373 } 374 375}