001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.util.Date;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Set;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.oozie.BundleJobBean;
028import org.apache.oozie.CoordinatorJobBean;
029import org.apache.oozie.ErrorCode;
030import org.apache.oozie.command.CommandException;
031import org.apache.oozie.command.bundle.BundleStatusTransitXCommand;
032import org.apache.oozie.command.coord.CoordStatusTransitXCommand;
033import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
034import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
035import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor;
036import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
037import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
038import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
039import org.apache.oozie.executor.jpa.JPAExecutorException;
040import org.apache.oozie.util.DateUtils;
041import org.apache.oozie.lock.LockToken;
042import org.apache.oozie.util.XLog;
043
044/**
045 * StateTransitService is scheduled to run at the configured interval.
046 * <p>
047 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job
048 * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
049 * SUCCEEDED.
050 */
051public class StatusTransitService implements Service {
052    private static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService.";
053    private static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval";
054    public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX
055            + "backward.support.for.coord.status";
056    public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = CONF_PREFIX
057            + "backward.support.for.states.without.error";
058    public static int limit = -1;
059    public static Date lastInstanceStartTime = null;
060    public final static XLog LOG = XLog.getLog(StatusTransitRunnable.class);
061
062    /**
063     * StateTransitRunnable is the runnable which is scheduled to run at the configured interval.
064     * <p>
065     * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0
066     * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
067     * SUCCEEDED.
068     */
069    public static class StatusTransitRunnable implements Runnable {
070        private JPAService jpaService = null;
071        private LockToken lock;
072
073        private Set<String> coordFailedIds = new HashSet<String>();
074        private Set<String> bundleFailedIds = new HashSet<String>();
075
076        public StatusTransitRunnable() {
077            jpaService = Services.get().get(JPAService.class);
078            if (jpaService == null) {
079                LOG.error("Missing JPAService");
080            }
081        }
082
083        @Override
084        public void run() {
085            try {
086                final Date curDate = new Date(); // records the start time of this service run;
087
088                // first check if there is some other instance running;
089                lock = Services.get().get(MemoryLocksService.class)
090                        .getWriteLock(StatusTransitService.class.getName(), lockTimeout);
091                if (lock == null) {
092                    LOG.info("This StatusTransitService instance"
093                            + " will not run since there is already an instance running");
094                }
095                else {
096                    LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName());
097                    coordTransit();
098                    bundleTransit();
099                    lastInstanceStartTime = curDate;
100                }
101            }
102            catch (Exception ex) {
103                LOG.warn("Exception happened during StatusTransitRunnable ", ex);
104            }
105            finally {
106                if (lock != null) {
107                    lock.release();
108                    LOG.info("Released lock for [{0}]", StatusTransitService.class.getName());
109                }
110            }
111        }
112
113        /**
114         * Aggregate bundle actions' status to bundle jobs
115         *
116         * @throws JPAExecutorException thrown if failed in db updates or retrievals
117         * @throws CommandException thrown if failed to run commands
118         */
119        private void bundleTransit() throws JPAExecutorException, CommandException {
120            List<BundleJobBean> pendingJobCheckList;
121            final Set<String> bundleIds = new HashSet<String>();
122
123            if (lastInstanceStartTime == null) {
124                LOG.info("Running bundle status service first instance");
125                // This is the first instance, we need to check for all pending or running jobs;
126                // TODO currently limit is = -1. Need to do actual batching
127                pendingJobCheckList = jpaService.execute(new BundleJobsGetRunningOrPendingJPAExecutor(limit));
128            }
129            else {
130                LOG.info("Running bundle status service from last instance time =  "
131                        + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
132                // this is not the first instance, we should only check jobs that have actions been
133                // updated >= start time of last service run;
134                pendingJobCheckList = BundleJobQueryExecutor.getInstance().getList(
135                        BundleJobQuery.GET_BUNDLE_IDS_FOR_STATUS_TRANSIT, lastInstanceStartTime);
136            }
137            for (BundleJobBean job : pendingJobCheckList) {
138                bundleIds.add(job.getId());
139            }
140            bundleIds.addAll(bundleFailedIds);
141            bundleFailedIds.clear();
142            for (final String jobId : bundleIds) {
143                try {
144                    new BundleStatusTransitXCommand(jobId).call();
145                }
146                catch (CommandException e) {
147                    // Unable to acquire lock. Will try next time
148                    if (e.getErrorCode() == ErrorCode.E0606) {
149                        bundleFailedIds.add(jobId);
150                        LOG.info("Unable to acquire lock for " + jobId + ". Will try next time");
151                    }
152                    else {
153                        LOG.error("Error running BundleStatusTransitXCommand for job " + jobId, e);
154                    }
155
156                }
157            }
158        }
159
160        /**
161         * Aggregate coordinator actions' status to coordinator jobs
162         *
163         * @throws JPAExecutorException thrown if failed in db updates or retrievals
164         * @throws CommandException thrown if failed to run commands
165         */
166        private void coordTransit() throws JPAExecutorException, CommandException {
167            List<CoordinatorJobBean> pendingJobCheckList = null;
168            final Set<String> coordIds = new HashSet<String>();
169            if (lastInstanceStartTime == null) {
170                LOG.info("Running coordinator status service first instance");
171                // this is the first instance, we need to check for all pending jobs;
172                pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit));
173            }
174            else {
175                LOG.info("Running coordinator status service from last instance time =  "
176                        + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
177                // this is not the first instance, we should only check jobs.
178                // that have actions or jobs been updated >= start time of last service run;
179                pendingJobCheckList = CoordJobQueryExecutor.getInstance().getList(
180                        CoordJobQuery.GET_COORD_IDS_FOR_STATUS_TRANSIT, lastInstanceStartTime);
181
182                pendingJobCheckList.addAll(CoordJobQueryExecutor.getInstance().getList(
183                        CoordJobQuery.GET_COORD_JOBS_CHANGED, lastInstanceStartTime));
184            }
185            for (final CoordinatorJobBean job : pendingJobCheckList) {
186                coordIds.add(job.getId());
187            }
188            coordIds.addAll(coordFailedIds);
189            coordFailedIds.clear();
190            for (final String coordId : coordIds) {
191                try {
192                    new CoordStatusTransitXCommand(coordId).call();
193                }
194                catch (CommandException e) {
195                    // Unable to acquire lock. Will try next time
196                    if (e.getErrorCode() == ErrorCode.E0606) {
197                        coordFailedIds.add(coordId);
198                        LOG.info("Unable to acquire lock for " + coordId + ". Will try next time");
199
200                    }
201                    else {
202                        LOG.error("Error running CoordStatusTransitXCommand for job " + coordId, e);
203                    }
204
205                }
206            }
207        }
208    }
209
210    /**
211     * Initializes the {@link StatusTransitService}.
212     *
213     * @param services services instance.
214     */
215    @Override
216    public void init(Services services) {
217        final Configuration conf = services.getConf();
218        Runnable stateTransitRunnable = new StatusTransitRunnable();
219        services.get(SchedulerService.class).schedule(stateTransitRunnable, 10,
220                ConfigurationService.getInt(conf, CONF_STATUSTRANSIT_INTERVAL), SchedulerService.Unit.SEC);
221    }
222
223    /**
224     * Destroy the StateTransit Jobs Service.
225     */
226    @Override
227    public void destroy() {
228    }
229
230    /**
231     * Return the public interface for the purge jobs service.
232     *
233     * @return {@link StatusTransitService}.
234     */
235    @Override
236    public Class<? extends Service> getInterface() {
237        return StatusTransitService.class;
238    }
239
240}