This project has retired. For details please refer to its Attic page.
001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.sql.Timestamp;
021    
022    import org.apache.oozie.CoordinatorJobBean;
023    import org.apache.oozie.client.CoordinatorJob;
024    import org.apache.oozie.command.CommandException;
025    import org.apache.oozie.store.CoordinatorStore;
026    import org.apache.oozie.store.StoreException;
027    import org.apache.oozie.util.DateUtils;
028    import org.apache.oozie.util.XLog;
029    
030    public class CoordJobMatLookupCommand extends CoordinatorCommand<Void> {
031        private static final int LOOKAHEAD_WINDOW = 300; // We look ahead 5 minutes for materialization;
032        
033        private final XLog log = XLog.getLog(getClass());
034        private int materializationWindow;
035        private String jobId;
036    
037        public CoordJobMatLookupCommand(String id, int materializationWindow) {
038            super("materialization_lookup", "materialization_lookup", 1, XLog.STD);
039            this.jobId = id;
040            this.materializationWindow = materializationWindow;
041        }
042    
043        @Override
044        protected Void call(CoordinatorStore store) throws StoreException, CommandException {
045            //CoordinatorJobBean coordJob = store.getCoordinatorJob(jobId, true);
046            CoordinatorJobBean coordJob = store.getEntityManager().find(CoordinatorJobBean.class, jobId);
047            setLogInfo(coordJob);
048    
049            if (!(coordJob.getStatus() == CoordinatorJobBean.Status.PREP || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNING)) {
050                log.debug("CoordJobMatLookupCommand for jobId=" + jobId + " job is not in PREP or RUNNING but in "
051                        + coordJob.getStatus());
052                return null;
053            }
054    
055            if (coordJob.getNextMaterializedTimestamp() != null
056                    && coordJob.getNextMaterializedTimestamp().compareTo(coordJob.getEndTimestamp()) >= 0) {
057                log.debug("CoordJobMatLookupCommand for jobId=" + jobId + " job is already materialized");
058                return null;
059            }
060    
061            if (coordJob.getNextMaterializedTimestamp() != null
062                    && coordJob.getNextMaterializedTimestamp().compareTo(new Timestamp(System.currentTimeMillis())) >= 0) {
063                log.debug("CoordJobMatLookupCommand for jobId=" + jobId + " job is already materialized");
064                return null;
065            }
066    
067            Timestamp startTime = coordJob.getNextMaterializedTimestamp();
068            if (startTime == null) {
069                startTime = coordJob.getStartTimestamp();
070                
071                if (startTime.after(new Timestamp(System.currentTimeMillis() + LOOKAHEAD_WINDOW * 1000))) {
072                    log.debug("CoordJobMatLookupCommand for jobId=" + jobId + " job's start time is not reached yet - nothing to materialize");
073                    return null;
074                }
075            }
076            // calculate end time by adding materializationWindow to start time.
077            // need to convert materializationWindow from secs to milliseconds
078            long startTimeMilli = startTime.getTime();
079            long endTimeMilli = startTimeMilli + (materializationWindow * 1000);
080            Timestamp endTime = new Timestamp(endTimeMilli);
081            // if MaterializationWindow end time is greater than endTime
082            // for job, then set it to endTime of job
083            Timestamp jobEndTime = coordJob.getEndTimestamp();
084            if (endTime.compareTo(jobEndTime) > 0) {
085                endTime = jobEndTime;
086            }
087            // update status of job from PREP or RUNNING to PREMATER in coordJob
088            coordJob.setStatus(CoordinatorJob.Status.PREMATER);
089            store.updateCoordinatorJobStatus(coordJob);
090    
091            log.debug("Materializing coord job id=" + jobId + ", start=" + DateUtils.toDate(startTime) + ", end=" + DateUtils.toDate(endTime)
092                    + ", window=" + materializationWindow + ", status=PREMATER");
093            queueCallable(new CoordActionMaterializeCommand(jobId, DateUtils.toDate(startTime), DateUtils.toDate(endTime)),
094                    100);
095            return null;
096        }
097    
098        @Override
099        protected Void execute(CoordinatorStore store) throws StoreException, CommandException {
100            log.info("STARTED CoordJobMatLookupCommand jobId=" + jobId + ", materializationWindow="
101                    + materializationWindow);
102            try {
103                if (lock(jobId)) {
104                    call(store);
105                }
106                else {
107                    queueCallable(new CoordJobMatLookupCommand(jobId, materializationWindow), LOCK_FAILURE_REQUEUE_INTERVAL);
108                    log.warn("CoordJobMatLookupCommand lock was not acquired - failed jobId=" + jobId
109                            + ". Requeing the same.");
110                }
111            }
112            catch (InterruptedException e) {
113                queueCallable(new CoordJobMatLookupCommand(jobId, materializationWindow), LOCK_FAILURE_REQUEUE_INTERVAL);
114                log.warn("CoordJobMatLookupCommand lock acquiring failed with exception " + e.getMessage() + " for jobId="
115                        + jobId + " Requeing the same.");
116            }
117            finally {
118                log.info("ENDED CoordJobMatLookupCommand jobId=" + jobId + ", materializationWindow="
119                        + materializationWindow);
120            }
121            return null;
122        }
123    }