This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.sql.Timestamp;
023 import java.util.Calendar;
024 import java.util.Date;
025 import java.util.TimeZone;
026
027 import org.apache.hadoop.conf.Configuration;
028 import org.apache.oozie.CoordinatorActionBean;
029 import org.apache.oozie.CoordinatorJobBean;
030 import org.apache.oozie.ErrorCode;
031 import org.apache.oozie.SLAEventBean;
032 import org.apache.oozie.client.CoordinatorJob;
033 import org.apache.oozie.client.Job;
034 import org.apache.oozie.client.SLAEvent.SlaAppType;
035 import org.apache.oozie.command.CommandException;
036 import org.apache.oozie.command.MaterializeTransitionXCommand;
037 import org.apache.oozie.command.PreconditionException;
038 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
039 import org.apache.oozie.coord.TimeUnit;
040 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
041 import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
042 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
043 import org.apache.oozie.executor.jpa.JPAExecutorException;
044 import org.apache.oozie.service.JPAService;
045 import org.apache.oozie.service.Service;
046 import org.apache.oozie.service.Services;
047 import org.apache.oozie.util.DateUtils;
048 import org.apache.oozie.util.Instrumentation;
049 import org.apache.oozie.util.LogUtils;
050 import org.apache.oozie.util.ParamChecker;
051 import org.apache.oozie.util.StatusUtils;
052 import org.apache.oozie.util.XConfiguration;
053 import org.apache.oozie.util.XmlUtils;
054 import org.apache.oozie.util.db.SLADbOperations;
055 import org.jdom.Element;
056
057 /**
058 * Materialize actions for specified start and end time for coordinator job.
059 */
060 public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCommand {
061 private static final int LOOKAHEAD_WINDOW = 300; // We look ahead 5 minutes for materialization;
062 private JPAService jpaService = null;
063 private CoordinatorJobBean coordJob = null;
064 private String jobId = null;
065 private Date startMatdTime = null;
066 private Date endMatdTime = null;
067 private final int materializationWindow;
068 private int lastActionNumber = 1; // over-ride by DB value
069 private CoordinatorJob.Status prevStatus = null;
070 /**
071 * Default MAX timeout in minutes, after which coordinator input check will timeout
072 */
073 public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
074
075 /**
076 * The constructor for class {@link CoordMaterializeTransitionXCommand}
077 *
078 * @param jobId coordinator job id
079 * @param materializationWindow materialization window to calculate end time
080 */
081 public CoordMaterializeTransitionXCommand(String jobId, int materializationWindow) {
082 super("coord_mater", "coord_mater", 1);
083 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
084 this.materializationWindow = materializationWindow;
085 }
086
087 /* (non-Javadoc)
088 * @see org.apache.oozie.command.MaterializeTransitionXCommand#transitToNext()
089 */
090 @Override
091 public void transitToNext() throws CommandException {
092 }
093
094 /* (non-Javadoc)
095 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
096 */
097 @Override
098 public void updateJob() throws CommandException {
099 updateList.add(coordJob);
100 }
101
102 /* (non-Javadoc)
103 * @see org.apache.oozie.command.MaterializeTransitionXCommand#performWrites()
104 */
105 @Override
106 public void performWrites() throws CommandException {
107 try {
108 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
109 }
110 catch (JPAExecutorException jex) {
111 throw new CommandException(jex);
112 }
113 }
114
115 /* (non-Javadoc)
116 * @see org.apache.oozie.command.XCommand#getEntityKey()
117 */
118 @Override
119 public String getEntityKey() {
120 return this.jobId;
121 }
122
123 @Override
124 protected boolean isLockRequired() {
125 return true;
126 }
127
128 /* (non-Javadoc)
129 * @see org.apache.oozie.command.XCommand#loadState()
130 */
131 @Override
132 protected void loadState() throws CommandException {
133 jpaService = Services.get().get(JPAService.class);
134 if (jpaService == null) {
135 LOG.error(ErrorCode.E0610);
136 }
137
138 try {
139 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
140 prevStatus = coordJob.getStatus();
141 }
142 catch (JPAExecutorException jex) {
143 throw new CommandException(jex);
144 }
145
146 // calculate start materialize and end materialize time
147 calcMatdTime();
148
149 LogUtils.setLogInfo(coordJob, logInfo);
150 }
151
152 /**
153 * Calculate startMatdTime and endMatdTime from job's start time if next materialized time is null
154 *
155 * @throws CommandException thrown if failed to calculate startMatdTime and endMatdTime
156 */
157 protected void calcMatdTime() throws CommandException {
158 Timestamp startTime = coordJob.getNextMaterializedTimestamp();
159 if (startTime == null) {
160 startTime = coordJob.getStartTimestamp();
161 }
162 // calculate end time by adding materializationWindow to start time.
163 // need to convert materializationWindow from secs to milliseconds
164 long startTimeMilli = startTime.getTime();
165 long endTimeMilli = startTimeMilli + (materializationWindow * 1000);
166
167 startMatdTime = DateUtils.toDate(new Timestamp(startTimeMilli));
168 endMatdTime = DateUtils.toDate(new Timestamp(endTimeMilli));
169 // if MaterializationWindow end time is greater than endTime
170 // for job, then set it to endTime of job
171 Date jobEndTime = coordJob.getEndTime();
172 if (endMatdTime.compareTo(jobEndTime) > 0) {
173 endMatdTime = jobEndTime;
174 }
175
176 LOG.debug("Materializing coord job id=" + jobId + ", start=" + startMatdTime + ", end=" + endMatdTime
177 + ", window=" + materializationWindow);
178 }
179
180 /* (non-Javadoc)
181 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
182 */
183 @Override
184 protected void verifyPrecondition() throws CommandException, PreconditionException {
185 if (!(coordJob.getStatus() == CoordinatorJobBean.Status.PREP || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNING
186 || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNINGWITHERROR)) {
187 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId
188 + " job is not in PREP or RUNNING but in " + coordJob.getStatus());
189 }
190
191 if (coordJob.getNextMaterializedTimestamp() != null
192 && coordJob.getNextMaterializedTimestamp().compareTo(coordJob.getEndTimestamp()) >= 0) {
193 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId
194 + " job is already materialized");
195 }
196
197 Timestamp startTime = coordJob.getNextMaterializedTimestamp();
198 if (startTime == null) {
199 startTime = coordJob.getStartTimestamp();
200
201 if (startTime.after(new Timestamp(System.currentTimeMillis() + LOOKAHEAD_WINDOW * 1000))) {
202 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId="
203 + jobId + " job's start time is not reached yet - nothing to materialize");
204 }
205 }
206
207 if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(coordJob.getEndTime()) >= 0) {
208 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
209 + ", all actions have been materialized from start time = " + coordJob.getStartTime()
210 + " to end time = " + coordJob.getEndTime() + ", job status = " + coordJob.getStatusStr());
211 }
212
213 if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(endMatdTime) >= 0) {
214 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
215 + ", action is *already* materialized for Materialization start time = " + startMatdTime
216 + ", materialization end time = " + endMatdTime + ", job status = " + coordJob.getStatusStr());
217 }
218
219 if (endMatdTime.after(coordJob.getEndTime())) {
220 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
221 + " materialization end time = " + endMatdTime + " surpasses coordinator job's end time = "
222 + coordJob.getEndTime() + " job status = " + coordJob.getStatusStr());
223 }
224
225 if (coordJob.getPauseTime() != null && !startMatdTime.before(coordJob.getPauseTime())) {
226 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId
227 + ", materialization start time = " + startMatdTime
228 + " is after or equal to coordinator job's pause time = " + coordJob.getPauseTime()
229 + ", job status = " + coordJob.getStatusStr());
230 }
231
232 }
233
234 /* (non-Javadoc)
235 * @see org.apache.oozie.command.MaterializeTransitionXCommand#materialize()
236 */
237 @Override
238 protected void materialize() throws CommandException {
239 Instrumentation.Cron cron = new Instrumentation.Cron();
240 cron.start();
241 try {
242 materializeActions(false);
243 updateJobMaterializeInfo(coordJob);
244 }
245 catch (CommandException ex) {
246 LOG.warn("Exception occurs:" + ex.getMessage() + " Making the job failed ", ex);
247 coordJob.setStatus(Job.Status.FAILED);
248 coordJob.resetPending();
249 }
250 catch (Exception e) {
251 LOG.error("Excepion thrown :", e);
252 throw new CommandException(ErrorCode.E1001, e.getMessage(), e);
253 }
254 cron.stop();
255
256 }
257
258 /**
259 * Create action instances starting from "startMatdTime" to "endMatdTime" and store them into coord action table.
260 *
261 * @param dryrun if this is a dry run
262 * @throws Exception thrown if failed to materialize actions
263 */
264 protected String materializeActions(boolean dryrun) throws Exception {
265
266 Configuration jobConf = null;
267 try {
268 jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
269 }
270 catch (IOException ioe) {
271 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe);
272 throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe);
273 }
274
275 String jobXml = coordJob.getJobXml();
276 Element eJob = XmlUtils.parseXml(jobXml);
277 TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone());
278 int frequency = coordJob.getFrequency();
279 TimeUnit freqTU = TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit"));
280 TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
281 Calendar start = Calendar.getInstance(appTz);
282 start.setTime(startMatdTime);
283 DateUtils.moveToEnd(start, endOfFlag);
284 Calendar end = Calendar.getInstance(appTz);
285 end.setTime(endMatdTime);
286 lastActionNumber = coordJob.getLastActionNumber();
287 LOG.info("materialize actions for tz=" + appTz.getDisplayName() + ",\n start=" + start.getTime() + ", end="
288 + end.getTime() + ",\n timeUnit " + freqTU.getCalendarUnit() + ",\n frequency :" + frequency + ":"
289 + freqTU + ",\n lastActionNumber " + lastActionNumber);
290 // Keep the actual start time
291 Calendar origStart = Calendar.getInstance(appTz);
292 origStart.setTime(coordJob.getStartTimestamp());
293 // Move to the End of duration, if needed.
294 DateUtils.moveToEnd(origStart, endOfFlag);
295 // Cloning the start time to be used in loop iteration
296 Calendar effStart = (Calendar) origStart.clone();
297 // Move the time when the previous action finished
298 effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency);
299
300 StringBuilder actionStrings = new StringBuilder();
301 Date jobPauseTime = coordJob.getPauseTime();
302 Calendar pause = null;
303 if (jobPauseTime != null) {
304 pause = Calendar.getInstance(appTz);
305 pause.setTime(DateUtils.convertDateToTimestamp(jobPauseTime));
306 }
307
308 String action = null;
309 JPAService jpaService = Services.get().get(JPAService.class);
310 int numWaitingActions = jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
311 int maxActionToBeCreated = coordJob.getMatThrottling() - numWaitingActions;
312 LOG.debug("Coordinator job :" + coordJob.getId() + ", maxActionToBeCreated :" + maxActionToBeCreated
313 + ", Mat_Throttle :" + coordJob.getMatThrottling() + ", numWaitingActions :" + numWaitingActions);
314
315 while (effStart.compareTo(end) < 0 && maxActionToBeCreated-- > 0) {
316 if (pause != null && effStart.compareTo(pause) >= 0) {
317 break;
318 }
319 CoordinatorActionBean actionBean = new CoordinatorActionBean();
320 lastActionNumber++;
321
322 int timeout = coordJob.getTimeout();
323 LOG.debug("Materializing action for time=" + effStart.getTime() + ", lastactionnumber=" + lastActionNumber
324 + " timeout=" + timeout + " minutes");
325 Date actualTime = new Date();
326 action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(),
327 effStart.getTime(), actualTime, lastActionNumber, jobConf, actionBean);
328 actionBean.setTimeOut(timeout);
329
330 if (!dryrun) {
331 storeToDB(actionBean, action); // Storing to table
332 }
333 else {
334 actionStrings.append("action for new instance");
335 actionStrings.append(action);
336 }
337 // Restore the original start time
338 effStart = (Calendar) origStart.clone();
339 effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency);
340 }
341
342 endMatdTime = new Date(effStart.getTimeInMillis());
343 if (!dryrun) {
344 return action;
345 }
346 else {
347 return actionStrings.toString();
348 }
349 }
350
351 private void storeToDB(CoordinatorActionBean actionBean, String actionXml) throws Exception {
352 LOG.debug("In storeToDB() coord action id = " + actionBean.getId() + ", size of actionXml = "
353 + actionXml.length());
354 actionBean.setActionXml(actionXml);
355
356 insertList.add(actionBean);
357 writeActionRegistration(actionXml, actionBean);
358
359 // TODO: time 100s should be configurable
360 queue(new CoordActionNotificationXCommand(actionBean), 100);
361 queue(new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId()), 100);
362 }
363
364 private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean) throws Exception {
365 Element eAction = XmlUtils.parseXml(actionXml);
366 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
367 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, coordJob
368 .getUser(), coordJob.getGroup(), LOG);
369 if(slaEvent != null) {
370 insertList.add(slaEvent);
371 }
372 }
373
374 private void updateJobMaterializeInfo(CoordinatorJobBean job) throws CommandException {
375 job.setLastActionTime(endMatdTime);
376 job.setLastActionNumber(lastActionNumber);
377 // if the job endtime == action endtime, we don't need to materialize this job anymore
378 Date jobEndTime = job.getEndTime();
379
380
381 if (job.getStatus() == CoordinatorJob.Status.PREP){
382 LOG.info("[" + job.getId() + "]: Update status from " + job.getStatus() + " to RUNNING");
383 job.setStatus(Job.Status.RUNNING);
384 }
385 job.setPending();
386
387 if (jobEndTime.compareTo(endMatdTime) <= 0) {
388 LOG.info("[" + job.getId() + "]: all actions have been materialized, job status = " + job.getStatus()
389 + ", set pending to true");
390 // set doneMaterialization to true when materialization is done
391 job.setDoneMaterialization();
392 }
393 job.setStatus(StatusUtils.getStatus(job));
394 job.setNextMaterializedTime(endMatdTime);
395 }
396
397 /* (non-Javadoc)
398 * @see org.apache.oozie.command.XCommand#getKey()
399 */
400 @Override
401 public String getKey() {
402 return getName() + "_" + jobId;
403 }
404
405 /* (non-Javadoc)
406 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
407 */
408 @Override
409 public void notifyParent() throws CommandException {
410 // update bundle action only when status changes in coord job
411 if (this.coordJob.getBundleId() != null) {
412 if (!prevStatus.equals(coordJob.getStatus())) {
413 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
414 bundleStatusUpdate.call();
415 }
416 }
417 }
418 }