This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.ArrayList;
023 import java.util.Calendar;
024 import java.util.Date;
025 import java.util.List;
026 import java.util.TimeZone;
027
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.oozie.CoordinatorActionBean;
030 import org.apache.oozie.CoordinatorJobBean;
031 import org.apache.oozie.ErrorCode;
032 import org.apache.oozie.SLAEventBean;
033 import org.apache.oozie.client.CoordinatorJob;
034 import org.apache.oozie.client.SLAEvent.SlaAppType;
035 import org.apache.oozie.client.rest.JsonBean;
036 import org.apache.oozie.command.CommandException;
037 import org.apache.oozie.coord.TimeUnit;
038 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
039 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
040 import org.apache.oozie.executor.jpa.JPAExecutorException;
041 import org.apache.oozie.service.JPAService;
042 import org.apache.oozie.service.Service;
043 import org.apache.oozie.service.Services;
044 import org.apache.oozie.store.CoordinatorStore;
045 import org.apache.oozie.store.StoreException;
046 import org.apache.oozie.util.DateUtils;
047 import org.apache.oozie.util.Instrumentation;
048 import org.apache.oozie.util.XConfiguration;
049 import org.apache.oozie.util.XLog;
050 import org.apache.oozie.util.XmlUtils;
051 import org.apache.oozie.util.db.SLADbOperations;
052 import org.jdom.Element;
053 import org.jdom.JDOMException;
054
055 public class CoordActionMaterializeCommand extends CoordinatorCommand<Void> {
056 private String jobId;
057 private Date startTime;
058 private Date endTime;
059 private int lastActionNumber = 1; // over-ride by DB value
060 private final XLog log = XLog.getLog(getClass());
061 private String user;
062 private String group;
063 private List<JsonBean> insertList = new ArrayList<JsonBean>();
064 private List<JsonBean> updateList = new ArrayList<JsonBean>();
065
066 /**
067 * Default timeout for catchup jobs, in minutes, after which coordinator input check will timeout
068 */
069 public static final String CONF_DEFAULT_TIMEOUT_CATCHUP = Service.CONF_PREFIX + "coord.catchup.default.timeout";
070
071 public CoordActionMaterializeCommand(String jobId, Date startTime, Date endTime) {
072 super("coord_action_mater", "coord_action_mater", 1, XLog.STD, false);
073 this.jobId = jobId;
074 this.startTime = startTime;
075 this.endTime = endTime;
076 }
077
078 @Override
079 protected Void call(CoordinatorStore store) throws CommandException {
080 CoordJobGetJPAExecutor getCoordJob = new CoordJobGetJPAExecutor(jobId);
081 CoordinatorJobBean job;
082 try {
083 job = Services.get().get(JPAService.class).execute(getCoordJob);
084 }
085 catch (JPAExecutorException jex) {
086 throw new CommandException(jex);
087 }
088 setLogInfo(job);
089 if (job.getLastActionTime() != null && job.getLastActionTime().compareTo(endTime) >= 0) {
090 log.info("ENDED Coordinator materialization for jobId = " + jobId
091 + " Action is *already* materialized for Materialization start time = " + startTime + " : Materialization end time = " + endTime + " Job status = " + job.getStatusStr());
092 return null;
093 }
094
095 if (endTime.after(job.getEndTime())) {
096 log.info("ENDED Coordinator materialization for jobId = " + jobId + " Materialization end time = " + endTime
097 + " surpasses coordinator job's end time = " + job.getEndTime() + " Job status = " + job.getStatusStr());
098 return null;
099 }
100
101 if (job.getPauseTime() != null && !startTime.before(job.getPauseTime())) {
102 log.info("ENDED Coordinator materialization for jobId = " + jobId + " Materialization start time = " + startTime
103 + " is after or equal to coordinator job's pause time = " + job.getPauseTime() + " Job status = " + job.getStatusStr());
104 // pausetime blocks real materialization - we change job's status back to RUNNING;
105 if (job.getStatus() == CoordinatorJob.Status.PREMATER) {
106 job.setStatus(CoordinatorJob.Status.RUNNING);
107 }
108 updateList.add(job);
109 return null;
110 }
111
112 this.user = job.getUser();
113 this.group = job.getGroup();
114
115 if (job.getStatus().equals(CoordinatorJobBean.Status.PREMATER)) {
116 Configuration jobConf = null;
117 log.debug("start job :" + jobId + " Materialization ");
118 try {
119 jobConf = new XConfiguration(new StringReader(job.getConf()));
120 }
121 catch (IOException ioe) {
122 log.warn("Configuration parse error. read from DB :" + job.getConf(), ioe);
123 throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe);
124 }
125
126 Instrumentation.Cron cron = new Instrumentation.Cron();
127 cron.start();
128 try {
129 materializeJobs(false, job, jobConf, store);
130 updateJobTable(job, store);
131 }
132 catch (CommandException ex) {
133 log.warn("Exception occurs:" + ex + " Making the job failed ");
134 job.setStatus(CoordinatorJobBean.Status.FAILED);
135 updateList.add(job);
136 }
137 catch (Exception e) {
138 log.error("Excepion thrown :", e);
139 throw new CommandException(ErrorCode.E1001, e.getMessage(), e);
140 }
141 cron.stop();
142 }
143 else {
144 log.info("WARN: action is not in PREMATER state! It's in state=" + job.getStatus());
145 }
146 return null;
147 }
148
149 /**
150 * Create action instances starting from "start-time" to end-time" and store them into Action table.
151 *
152 * @param dryrun
153 * @param jobBean
154 * @param conf
155 * @param store
156 * @throws Exception
157 */
158 protected String materializeJobs(boolean dryrun, CoordinatorJobBean jobBean, Configuration conf,
159 CoordinatorStore store) throws Exception {
160 String jobXml = jobBean.getJobXml();
161 Element eJob = XmlUtils.parseXml(jobXml);
162 // TODO: always UTC?
163 TimeZone appTz = DateUtils.getTimeZone(jobBean.getTimeZone());
164 // TimeZone appTz = DateUtils.getTimeZone("UTC");
165 int frequency = jobBean.getFrequency();
166 TimeUnit freqTU = TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit"));
167 TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
168 Calendar start = Calendar.getInstance(appTz);
169 start.setTime(startTime);
170 DateUtils.moveToEnd(start, endOfFlag);
171 Calendar end = Calendar.getInstance(appTz);
172 end.setTime(endTime);
173 lastActionNumber = jobBean.getLastActionNumber();
174 // DateUtils.moveToEnd(end, endOfFlag);
175 log.info(" *** materialize Actions for tz=" + appTz.getDisplayName() + ",\n start=" + start.getTime()
176 + ", end=" + end.getTime() + "\n TimeUNIT " + freqTU.getCalendarUnit() + " Frequency :" + frequency
177 + ":" + freqTU + " lastActionNumber " + lastActionNumber);
178 // Keep the actual start time
179 Calendar origStart = Calendar.getInstance(appTz);
180 origStart.setTime(jobBean.getStartTimestamp());
181 // Move to the End of duration, if needed.
182 DateUtils.moveToEnd(origStart, endOfFlag);
183 // Cloning the start time to be used in loop iteration
184 Calendar effStart = (Calendar) origStart.clone();
185 // Move the time when the previous action finished
186 effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency);
187
188 String action = null;
189 StringBuilder actionStrings = new StringBuilder();
190 Date jobPauseTime = jobBean.getPauseTime();
191 Calendar pause = null;
192 if (jobPauseTime != null) {
193 pause = Calendar.getInstance(appTz);
194 pause.setTime(DateUtils.convertDateToTimestamp(jobPauseTime));
195 }
196
197 while (effStart.compareTo(end) < 0) {
198 if (pause != null && effStart.compareTo(pause) >= 0) {
199 break;
200 }
201 CoordinatorActionBean actionBean = new CoordinatorActionBean();
202 lastActionNumber++;
203
204 int timeout = jobBean.getTimeout();
205 log.debug(origStart.getTime() + " Materializing action for time=" + effStart.getTime()
206 + ", lastactionnumber=" + lastActionNumber);
207 Date actualTime = new Date();
208 action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(),
209 effStart.getTime(), actualTime, lastActionNumber, conf, actionBean);
210 int catchUpTOMultiplier = 1; // This value might be could be changed in future
211 if (actionBean.getNominalTimestamp().before(jobBean.getCreatedTimestamp())) {
212 // Catchup action
213 timeout = catchUpTOMultiplier * timeout;
214 // actionBean.setTimeOut(Services.get().getConf().getInt(CONF_DEFAULT_TIMEOUT_CATCHUP,
215 // -1));
216 log.info("Catchup timeout is :" + actionBean.getTimeOut());
217 }
218 actionBean.setTimeOut(timeout);
219
220 if (!dryrun) {
221 storeToDB(actionBean, action, store); // Storing to table
222 }
223 else {
224 actionStrings.append("action for new instance");
225 actionStrings.append(action);
226 }
227 // Restore the original start time
228 effStart = (Calendar) origStart.clone();
229 effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency);
230 }
231
232 endTime = new Date(effStart.getTimeInMillis());
233 if (!dryrun) {
234 return action;
235 }
236 else {
237 return actionStrings.toString();
238 }
239 }
240
241 /**
242 * Store an Action into database table.
243 *
244 * @param actionBean
245 * @param actionXml
246 * @param store
247 * @param wantSla
248 * @throws StoreException
249 * @throws JDOMException
250 */
251 private void storeToDB(CoordinatorActionBean actionBean, String actionXml, CoordinatorStore store) throws Exception {
252 log.debug("In storeToDB() action Id " + actionBean.getId() + " Size of actionXml " + actionXml.length());
253 actionBean.setActionXml(actionXml);
254 insertList.add(actionBean);
255 createActionRegistration(actionXml, actionBean, store);
256
257 // TODO: time 100s should be configurable
258 queueCallable(new CoordActionNotificationXCommand(actionBean), 100);
259 queueCallable(new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId()), 100);
260 }
261
262 /**
263 * @param actionXml
264 * @param actionBean
265 * @param store
266 * @throws Exception
267 */
268 private void createActionRegistration(String actionXml, CoordinatorActionBean actionBean, CoordinatorStore store)
269 throws Exception {
270 Element eAction = XmlUtils.parseXml(actionXml);
271 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
272 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, store, actionBean.getId(),
273 SlaAppType.COORDINATOR_ACTION, user, group);
274 if(slaEvent != null) {
275 insertList.add(slaEvent);
276 }
277 }
278
279 /**
280 * @param job
281 * @param store
282 * @throws StoreException
283 */
284 private void updateJobTable(CoordinatorJobBean job, CoordinatorStore store) {
285 // TODO: why do we need this? Isn't lastMatTime enough???
286 job.setLastActionTime(endTime);
287 job.setLastActionNumber(lastActionNumber);
288 // if the job endtime == action endtime, then set status of job to
289 // succeeded
290 // we dont need to materialize this job anymore
291 Date jobEndTime = job.getEndTime();
292 if (jobEndTime.compareTo(endTime) <= 0) {
293 job.setStatus(CoordinatorJob.Status.SUCCEEDED);
294 log.info("[" + job.getId() + "]: Update status from PREMATER to SUCCEEDED");
295 }
296 else {
297 job.setStatus(CoordinatorJob.Status.RUNNING);
298 log.info("[" + job.getId() + "]: Update status from PREMATER to RUNNING");
299 }
300 job.setNextMaterializedTime(endTime);
301 updateList.add(job);
302 }
303
304 @Override
305 protected Void execute(CoordinatorStore store) throws StoreException, CommandException {
306 log.info("STARTED CoordActionMaterializeCommand for jobId=" + jobId + ", startTime=" + startTime + ", endTime="
307 + endTime);
308 try {
309 if (lock(jobId)) {
310 call(store);
311 JPAService jpaService = Services.get().get(JPAService.class);
312 if (jpaService != null) {
313 try {
314 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
315 }
316 catch (JPAExecutorException je) {
317 throw new CommandException(je);
318 }
319 }
320 else {
321 throw new CommandException(ErrorCode.E0610);
322 }
323 }
324 else {
325 queueCallable(new CoordActionMaterializeCommand(jobId, startTime, endTime),
326 LOCK_FAILURE_REQUEUE_INTERVAL);
327 log.warn("CoordActionMaterializeCommand lock was not acquired - failed jobId=" + jobId
328 + ". Requeing the same.");
329 }
330 }
331 catch (InterruptedException e) {
332 queueCallable(new CoordActionMaterializeCommand(jobId, startTime, endTime), LOCK_FAILURE_REQUEUE_INTERVAL);
333 log.warn("CoordActionMaterializeCommand lock acquiring failed with exception " + e.getMessage()
334 + " for jobId=" + jobId + " Requeing the same.");
335 }
336 finally {
337 log.info(" ENDED CoordActionMaterializeCommand for jobId=" + jobId + ", startTime=" + startTime
338 + ", endTime=" + endTime);
339 }
340 return null;
341 }
342
343
344
345 /**
346 * For preliminery testing. Should be removed soon
347 *
348 * @param args
349 * @throws Exception
350 */
351 public static void main(String[] args) throws Exception {
352 new Services().init();
353 try {
354 Date startTime = DateUtils.parseDateUTC("2009-02-01T01:00Z");
355 Date endTime = DateUtils.parseDateUTC("2009-02-02T01:00Z");
356 String jobId = "0000000-091207151850551-oozie-dani-C";
357 CoordActionMaterializeCommand matCmd = new CoordActionMaterializeCommand(jobId, startTime, endTime);
358 matCmd.call();
359 }
360 finally {
361 try {
362 Thread.sleep(60000);
363 }
364 catch (Exception ex) {
365 }
366 new Services().destroy();
367 }
368 }
369
370 }