This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.sql.Timestamp;
021
022 import org.apache.oozie.CoordinatorJobBean;
023 import org.apache.oozie.client.CoordinatorJob;
024 import org.apache.oozie.command.CommandException;
025 import org.apache.oozie.store.CoordinatorStore;
026 import org.apache.oozie.store.StoreException;
027 import org.apache.oozie.util.DateUtils;
028 import org.apache.oozie.util.XLog;
029
030 public class CoordJobMatLookupCommand extends CoordinatorCommand<Void> {
031 private static final int LOOKAHEAD_WINDOW = 300; // We look ahead 5 minutes for materialization;
032
033 private final XLog log = XLog.getLog(getClass());
034 private int materializationWindow;
035 private String jobId;
036
037 public CoordJobMatLookupCommand(String id, int materializationWindow) {
038 super("materialization_lookup", "materialization_lookup", 1, XLog.STD);
039 this.jobId = id;
040 this.materializationWindow = materializationWindow;
041 }
042
043 @Override
044 protected Void call(CoordinatorStore store) throws StoreException, CommandException {
045 //CoordinatorJobBean coordJob = store.getCoordinatorJob(jobId, true);
046 CoordinatorJobBean coordJob = store.getEntityManager().find(CoordinatorJobBean.class, jobId);
047 setLogInfo(coordJob);
048
049 if (!(coordJob.getStatus() == CoordinatorJobBean.Status.PREP || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNING)) {
050 log.debug("CoordJobMatLookupCommand for jobId=" + jobId + " job is not in PREP or RUNNING but in "
051 + coordJob.getStatus());
052 return null;
053 }
054
055 if (coordJob.getNextMaterializedTimestamp() != null
056 && coordJob.getNextMaterializedTimestamp().compareTo(coordJob.getEndTimestamp()) >= 0) {
057 log.debug("CoordJobMatLookupCommand for jobId=" + jobId + " job is already materialized");
058 return null;
059 }
060
061 if (coordJob.getNextMaterializedTimestamp() != null
062 && coordJob.getNextMaterializedTimestamp().compareTo(new Timestamp(System.currentTimeMillis())) >= 0) {
063 log.debug("CoordJobMatLookupCommand for jobId=" + jobId + " job is already materialized");
064 return null;
065 }
066
067 Timestamp startTime = coordJob.getNextMaterializedTimestamp();
068 if (startTime == null) {
069 startTime = coordJob.getStartTimestamp();
070
071 if (startTime.after(new Timestamp(System.currentTimeMillis() + LOOKAHEAD_WINDOW * 1000))) {
072 log.debug("CoordJobMatLookupCommand for jobId=" + jobId + " job's start time is not reached yet - nothing to materialize");
073 return null;
074 }
075 }
076 // calculate end time by adding materializationWindow to start time.
077 // need to convert materializationWindow from secs to milliseconds
078 long startTimeMilli = startTime.getTime();
079 long endTimeMilli = startTimeMilli + (materializationWindow * 1000);
080 Timestamp endTime = new Timestamp(endTimeMilli);
081 // if MaterializationWindow end time is greater than endTime
082 // for job, then set it to endTime of job
083 Timestamp jobEndTime = coordJob.getEndTimestamp();
084 if (endTime.compareTo(jobEndTime) > 0) {
085 endTime = jobEndTime;
086 }
087 // update status of job from PREP or RUNNING to PREMATER in coordJob
088 coordJob.setStatus(CoordinatorJob.Status.PREMATER);
089 store.updateCoordinatorJobStatus(coordJob);
090
091 log.debug("Materializing coord job id=" + jobId + ", start=" + DateUtils.toDate(startTime) + ", end=" + DateUtils.toDate(endTime)
092 + ", window=" + materializationWindow + ", status=PREMATER");
093 queueCallable(new CoordActionMaterializeCommand(jobId, DateUtils.toDate(startTime), DateUtils.toDate(endTime)),
094 100);
095 return null;
096 }
097
098 @Override
099 protected Void execute(CoordinatorStore store) throws StoreException, CommandException {
100 log.info("STARTED CoordJobMatLookupCommand jobId=" + jobId + ", materializationWindow="
101 + materializationWindow);
102 try {
103 if (lock(jobId)) {
104 call(store);
105 }
106 else {
107 queueCallable(new CoordJobMatLookupCommand(jobId, materializationWindow), LOCK_FAILURE_REQUEUE_INTERVAL);
108 log.warn("CoordJobMatLookupCommand lock was not acquired - failed jobId=" + jobId
109 + ". Requeing the same.");
110 }
111 }
112 catch (InterruptedException e) {
113 queueCallable(new CoordJobMatLookupCommand(jobId, materializationWindow), LOCK_FAILURE_REQUEUE_INTERVAL);
114 log.warn("CoordJobMatLookupCommand lock acquiring failed with exception " + e.getMessage() + " for jobId="
115 + jobId + " Requeing the same.");
116 }
117 finally {
118 log.info("ENDED CoordJobMatLookupCommand jobId=" + jobId + ", materializationWindow="
119 + materializationWindow);
120 }
121 return null;
122 }
123 }