This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.Calendar;
023 import java.util.Date;
024 import java.util.TimeZone;
025
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.oozie.CoordinatorActionBean;
028 import org.apache.oozie.CoordinatorJobBean;
029 import org.apache.oozie.ErrorCode;
030 import org.apache.oozie.client.CoordinatorJob;
031 import org.apache.oozie.client.SLAEvent.SlaAppType;
032 import org.apache.oozie.command.CommandException;
033 import org.apache.oozie.coord.TimeUnit;
034 import org.apache.oozie.service.Service;
035 import org.apache.oozie.service.Services;
036 import org.apache.oozie.store.CoordinatorStore;
037 import org.apache.oozie.store.StoreException;
038 import org.apache.oozie.util.DateUtils;
039 import org.apache.oozie.util.Instrumentation;
040 import org.apache.oozie.util.XConfiguration;
041 import org.apache.oozie.util.XLog;
042 import org.apache.oozie.util.XmlUtils;
043 import org.apache.oozie.util.db.SLADbOperations;
044 import org.jdom.Element;
045 import org.jdom.JDOMException;
046
047 public class CoordActionMaterializeCommand extends CoordinatorCommand<Void> {
048 private String jobId;
049 private Date startTime;
050 private Date endTime;
051 private int lastActionNumber = 1; // over-ride by DB value
052 private final XLog log = XLog.getLog(getClass());
053 private String user;
054 private String group;
055 /**
056 * Default timeout for catchup jobs, in minutes, after which coordinator input check will timeout
057 */
058 public static final String CONF_DEFAULT_TIMEOUT_CATCHUP = Service.CONF_PREFIX + "coord.catchup.default.timeout";
059
060 public CoordActionMaterializeCommand(String jobId, Date startTime, Date endTime) {
061 super("coord_action_mater", "coord_action_mater", 1, XLog.STD);
062 this.jobId = jobId;
063 this.startTime = startTime;
064 this.endTime = endTime;
065 }
066
067 @Override
068 protected Void call(CoordinatorStore store) throws StoreException, CommandException {
069 // CoordinatorJobBean job = store.getCoordinatorJob(jobId, true);
070 CoordinatorJobBean job = store.getEntityManager().find(CoordinatorJobBean.class, jobId);
071 setLogInfo(job);
072 if (job.getLastActionTime() != null && job.getLastActionTime().compareTo(endTime) >= 0) {
073 log.info("ENDED Coordinator materialization for jobId = " + jobId
074 + " Action is *already* materialized for Materialization start time = " + startTime + " : Materialization end time = " + endTime + " Job status = " + job.getStatusStr());
075 return null;
076 }
077
078 if (endTime.after(job.getEndTime())) {
079 log.info("ENDED Coordinator materialization for jobId = " + jobId + " Materialization end time = " + endTime
080 + " surpasses coordinator job's end time = " + job.getEndTime() + " Job status = " + job.getStatusStr());
081 return null;
082 }
083
084 if (job.getPauseTime() != null && !startTime.before(job.getPauseTime())) {
085 log.info("ENDED Coordinator materialization for jobId = " + jobId + " Materialization start time = " + startTime
086 + " is after or equal to coordinator job's pause time = " + job.getPauseTime() + " Job status = " + job.getStatusStr());
087 // pausetime blocks real materialization - we change job's status back to RUNNING;
088 if (job.getStatus() == CoordinatorJob.Status.PREMATER) {
089 job.setStatus(CoordinatorJob.Status.RUNNING);
090 }
091 store.updateCoordinatorJob(job);
092 return null;
093 }
094
095 this.user = job.getUser();
096 this.group = job.getGroup();
097
098 if (job.getStatus().equals(CoordinatorJobBean.Status.PREMATER)) {
099 Configuration jobConf = null;
100 log.debug("start job :" + jobId + " Materialization ");
101 try {
102 jobConf = new XConfiguration(new StringReader(job.getConf()));
103 }
104 catch (IOException ioe) {
105 log.warn("Configuration parse error. read from DB :" + job.getConf(), ioe);
106 throw new CommandException(ErrorCode.E1005, ioe);
107 }
108
109 Instrumentation.Cron cron = new Instrumentation.Cron();
110 cron.start();
111 try {
112 materializeJobs(false, job, jobConf, store);
113 updateJobTable(job, store);
114 }
115 catch (CommandException ex) {
116 log.warn("Exception occurs:" + ex + " Making the job failed ");
117 job.setStatus(CoordinatorJobBean.Status.FAILED);
118 store.updateCoordinatorJob(job);
119 }
120 catch (Exception e) {
121 log.error("Excepion thrown :", e);
122 throw new CommandException(ErrorCode.E1001, e.getMessage(), e);
123 }
124 cron.stop();
125 }
126 else {
127 log.info("WARN: action is not in PREMATER state! It's in state=" + job.getStatus());
128 }
129 return null;
130 }
131
132 /**
133 * Create action instances starting from "start-time" to end-time" and store them into Action table.
134 *
135 * @param dryrun
136 * @param jobBean
137 * @param conf
138 * @param store
139 * @throws Exception
140 */
141 protected String materializeJobs(boolean dryrun, CoordinatorJobBean jobBean, Configuration conf,
142 CoordinatorStore store) throws Exception {
143 String jobXml = jobBean.getJobXml();
144 Element eJob = XmlUtils.parseXml(jobXml);
145 // TODO: always UTC?
146 TimeZone appTz = DateUtils.getTimeZone(jobBean.getTimeZone());
147 // TimeZone appTz = DateUtils.getTimeZone("UTC");
148 int frequency = jobBean.getFrequency();
149 TimeUnit freqTU = TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit"));
150 TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
151 Calendar start = Calendar.getInstance(appTz);
152 start.setTime(startTime);
153 DateUtils.moveToEnd(start, endOfFlag);
154 Calendar end = Calendar.getInstance(appTz);
155 end.setTime(endTime);
156 lastActionNumber = jobBean.getLastActionNumber();
157 // DateUtils.moveToEnd(end, endOfFlag);
158 log.info(" *** materialize Actions for tz=" + appTz.getDisplayName() + ",\n start=" + start.getTime()
159 + ", end=" + end.getTime() + "\n TimeUNIT " + freqTU.getCalendarUnit() + " Frequency :" + frequency
160 + ":" + freqTU + " lastActionNumber " + lastActionNumber);
161 // Keep the actual start time
162 Calendar origStart = Calendar.getInstance(appTz);
163 origStart.setTime(jobBean.getStartTimestamp());
164 // Move to the End of duration, if needed.
165 DateUtils.moveToEnd(origStart, endOfFlag);
166 // Cloning the start time to be used in loop iteration
167 Calendar effStart = (Calendar) origStart.clone();
168 // Move the time when the previous action finished
169 effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency);
170
171 String action = null;
172 StringBuilder actionStrings = new StringBuilder();
173 Date jobPauseTime = jobBean.getPauseTime();
174 Calendar pause = null;
175 if (jobPauseTime != null) {
176 pause = Calendar.getInstance(appTz);
177 pause.setTime(DateUtils.convertDateToTimestamp(jobPauseTime));
178 }
179
180 while (effStart.compareTo(end) < 0) {
181 if (pause != null && effStart.compareTo(pause) >= 0) {
182 break;
183 }
184 CoordinatorActionBean actionBean = new CoordinatorActionBean();
185 lastActionNumber++;
186
187 int timeout = jobBean.getTimeout();
188 log.debug(origStart.getTime() + " Materializing action for time=" + effStart.getTime()
189 + ", lastactionnumber=" + lastActionNumber);
190 Date actualTime = new Date();
191 action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(),
192 effStart.getTime(), actualTime, lastActionNumber, conf, actionBean);
193 int catchUpTOMultiplier = 1; // This value might be could be changed in future
194 if (actionBean.getNominalTimestamp().before(jobBean.getCreatedTimestamp())) {
195 // Catchup action
196 timeout = catchUpTOMultiplier * timeout;
197 // actionBean.setTimeOut(Services.get().getConf().getInt(CONF_DEFAULT_TIMEOUT_CATCHUP,
198 // -1));
199 log.info("Catchup timeout is :" + actionBean.getTimeOut());
200 }
201 actionBean.setTimeOut(timeout);
202
203 if (!dryrun) {
204 storeToDB(actionBean, action, store); // Storing to table
205 }
206 else {
207 actionStrings.append("action for new instance");
208 actionStrings.append(action);
209 }
210 // Restore the original start time
211 effStart = (Calendar) origStart.clone();
212 effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency);
213 }
214
215 endTime = new Date(effStart.getTimeInMillis());
216 if (!dryrun) {
217 return action;
218 }
219 else {
220 return actionStrings.toString();
221 }
222 }
223
224 /**
225 * Store an Action into database table.
226 *
227 * @param actionBean
228 * @param actionXml
229 * @param store
230 * @param wantSla
231 * @throws StoreException
232 * @throws JDOMException
233 */
234 private void storeToDB(CoordinatorActionBean actionBean, String actionXml, CoordinatorStore store) throws Exception {
235 log.debug("In storeToDB() action Id " + actionBean.getId() + " Size of actionXml " + actionXml.length());
236 actionBean.setActionXml(actionXml);
237 store.insertCoordinatorAction(actionBean);
238 writeActionRegistration(actionXml, actionBean, store);
239
240 // TODO: time 100s should be configurable
241 queueCallable(new CoordActionNotification(actionBean), 100);
242 queueCallable(new CoordActionInputCheckCommand(actionBean.getId()), 100);
243 }
244
245 /**
246 * @param actionXml
247 * @param actionBean
248 * @param store
249 * @throws Exception
250 */
251 private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, CoordinatorStore store)
252 throws Exception {
253 Element eAction = XmlUtils.parseXml(actionXml);
254 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
255 SLADbOperations.writeSlaRegistrationEvent(eSla, store, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, user,
256 group);
257 }
258
259 /**
260 * @param job
261 * @param store
262 * @throws StoreException
263 */
264 private void updateJobTable(CoordinatorJobBean job, CoordinatorStore store) throws StoreException {
265 // TODO: why do we need this? Isn't lastMatTime enough???
266 job.setLastActionTime(endTime);
267 job.setLastActionNumber(lastActionNumber);
268 // if the job endtime == action endtime, then set status of job to
269 // succeeded
270 // we dont need to materialize this job anymore
271 Date jobEndTime = job.getEndTime();
272 if (jobEndTime.compareTo(endTime) <= 0) {
273 job.setStatus(CoordinatorJob.Status.SUCCEEDED);
274 log.info("[" + job.getId() + "]: Update status from PREMATER to SUCCEEDED");
275 }
276 else {
277 job.setStatus(CoordinatorJob.Status.RUNNING);
278 log.info("[" + job.getId() + "]: Update status from PREMATER to RUNNING");
279 }
280 job.setNextMaterializedTime(endTime);
281 store.updateCoordinatorJob(job);
282 }
283
284 @Override
285 protected Void execute(CoordinatorStore store) throws StoreException, CommandException {
286 log.info("STARTED CoordActionMaterializeCommand for jobId=" + jobId + ", startTime=" + startTime + ", endTime="
287 + endTime);
288 try {
289 if (lock(jobId)) {
290 call(store);
291 }
292 else {
293 queueCallable(new CoordActionMaterializeCommand(jobId, startTime, endTime),
294 LOCK_FAILURE_REQUEUE_INTERVAL);
295 log.warn("CoordActionMaterializeCommand lock was not acquired - failed jobId=" + jobId
296 + ". Requeing the same.");
297 }
298 }
299 catch (InterruptedException e) {
300 queueCallable(new CoordActionMaterializeCommand(jobId, startTime, endTime), LOCK_FAILURE_REQUEUE_INTERVAL);
301 log.warn("CoordActionMaterializeCommand lock acquiring failed with exception " + e.getMessage()
302 + " for jobId=" + jobId + " Requeing the same.");
303 }
304 finally {
305 log.info(" ENDED CoordActionMaterializeCommand for jobId=" + jobId + ", startTime=" + startTime
306 + ", endTime=" + endTime);
307 }
308 return null;
309 }
310
311
312
313 /**
314 * For preliminery testing. Should be removed soon
315 *
316 * @param args
317 * @throws Exception
318 */
319 public static void main(String[] args) throws Exception {
320 new Services().init();
321 try {
322 Date startTime = DateUtils.parseDateUTC("2009-02-01T01:00Z");
323 Date endTime = DateUtils.parseDateUTC("2009-02-02T01:00Z");
324 String jobId = "0000000-091207151850551-oozie-dani-C";
325 CoordActionMaterializeCommand matCmd = new CoordActionMaterializeCommand(jobId, startTime, endTime);
326 matCmd.call();
327 }
328 finally {
329 try {
330 Thread.sleep(60000);
331 }
332 catch (Exception ex) {
333 }
334 new Services().destroy();
335 }
336 }
337
338 }