001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.util.ArrayList;
021import java.util.Date;
022import java.util.List;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.oozie.CoordinatorJobBean;
026import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
027import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
028import org.apache.oozie.executor.jpa.JPAExecutorException;
029import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
030import org.apache.oozie.lock.LockToken;
031import org.apache.oozie.util.XCallable;
032import org.apache.oozie.util.XLog;
033import org.apache.oozie.util.DateUtils;
034
035/**
036 * The coordinator Materialization Lookup trigger service schedule lookup trigger command for every interval (default is
037 * 5 minutes ). This interval could be configured through oozie configuration defined is either oozie-default.xml or
038 * oozie-site.xml using the property name oozie.service.CoordMaterializeTriggerService.lookup.interval
039 */
040public class CoordMaterializeTriggerService implements Service {
041    public static final String CONF_PREFIX = Service.CONF_PREFIX + "CoordMaterializeTriggerService.";
042    /**
043     * Time interval, in seconds, at which the Job materialization service will be scheduled to run.
044     */
045    public static final String CONF_LOOKUP_INTERVAL = CONF_PREFIX + "lookup.interval";
046
047    public static final String CONF_SCHEDULING_INTERVAL = CONF_PREFIX + "scheduling.interval";
048    /**
049     * This configuration defined the duration for which job should be materialized in future
050     */
051    public static final String CONF_MATERIALIZATION_WINDOW = CONF_PREFIX + "materialization.window";
052    /**
053     * The number of callables to be queued in a batch.
054     */
055    public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
056    /**
057     * The number of coordinator jobs to be picked for materialization at a given time.
058     */
059    public static final String CONF_MATERIALIZATION_SYSTEM_LIMIT = CONF_PREFIX + "materialization.system.limit";
060
061    private static final String INSTRUMENTATION_GROUP = "coord_job_mat";
062    private static final String INSTR_MAT_JOBS_COUNTER = "jobs";
063    public static final int CONF_LOOKUP_INTERVAL_DEFAULT = 300;
064    private static final int CONF_MATERIALIZATION_WINDOW_DEFAULT = 3600;
065    private static final int CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT = 50;
066
067    /**
068     * This runnable class will run in every "interval" to queue CoordMaterializeTransitionXCommand.
069     */
070    static class CoordMaterializeTriggerRunnable implements Runnable {
071        private int materializationWindow;
072        private int lookupInterval;
073        private long delay = 0;
074        private List<XCallable<Void>> callables;
075        private List<XCallable<Void>> delayedCallables;
076        private XLog LOG = XLog.getLog(getClass());
077
078
079        public CoordMaterializeTriggerRunnable(int materializationWindow, int lookupInterval) {
080            this.materializationWindow = materializationWindow;
081            this.lookupInterval = lookupInterval;
082        }
083
084        @Override
085        public void run() {
086            LockToken lock = null;
087
088            // first check if there is some other running instance from the same service;
089            try {
090                lock = Services.get().get(MemoryLocksService.class)
091                        .getWriteLock(CoordMaterializeTriggerService.class.getName(), lockTimeout);
092
093                if (lock != null) {
094                    runCoordJobMatLookup();
095                    if (null != callables) {
096                        boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
097                        if (ret == false) {
098                            XLog.getLog(getClass()).warn(
099                                    "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
100                                            + "Most possibly command queue is full. Queue size is :"
101                                            + Services.get().get(CallableQueueService.class).queueSize());
102                        }
103                        callables = null;
104                    }
105                    if (null != delayedCallables) {
106                        boolean ret = Services.get().get(CallableQueueService.class)
107                                .queueSerial(delayedCallables, this.delay);
108                        if (ret == false) {
109                            XLog.getLog(getClass()).warn(
110                                    "Unable to queue the delayedCallables commands for CoordMaterializeTriggerRunnable. "
111                                            + "Most possibly Callable queue is full. Queue size is :"
112                                            + Services.get().get(CallableQueueService.class).queueSize());
113                        }
114                        delayedCallables = null;
115                        this.delay = 0;
116                    }
117                }
118
119                else {
120                    LOG.debug("Can't obtain lock, skipping");
121                }
122            }
123            catch (Exception e) {
124                LOG.error("Exception", e);
125            }
126            finally {
127                if (lock != null) {
128                    lock.release();
129                    LOG.info("Released lock for [{0}]", CoordMaterializeTriggerService.class.getName());
130                }
131
132            }
133
134        }
135
136        /**
137         * Recover coordinator jobs that should be materialized
138         */
139        private void runCoordJobMatLookup() {
140            XLog.Info.get().clear();
141            XLog LOG = XLog.getLog(getClass());
142            try {
143                // get current date
144                Date currDate = new Date(new Date().getTime() + lookupInterval * 1000);
145                // get list of all jobs that have actions that should be materialized.
146                int materializationLimit = Services.get().getConf()
147                        .getInt(CONF_MATERIALIZATION_SYSTEM_LIMIT, CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT);
148                materializeCoordJobs(currDate, materializationLimit, LOG);
149            }
150
151            catch (Exception ex) {
152                LOG.error("Exception while attempting to materialize coordinator jobs, {0}", ex.getMessage(), ex);
153            }
154        }
155
156        private void materializeCoordJobs(Date currDate, int limit, XLog LOG) throws JPAExecutorException {
157            try {
158                List<CoordinatorJobBean> materializeJobs = CoordJobQueryExecutor.getInstance().getList(
159                        CoordJobQuery.GET_COORD_JOBS_OLDER_FOR_MATERILZATION, currDate, limit);
160                LOG.info("CoordMaterializeTriggerService - Curr Date= " + DateUtils.formatDateOozieTZ(currDate)
161                        + ", Num jobs to materialize = " + materializeJobs.size());
162                for (CoordinatorJobBean coordJob : materializeJobs) {
163                    Services.get().get(InstrumentationService.class).get()
164                            .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1);
165                    queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
166                    coordJob.setLastModifiedTime(new Date());
167                    // TODO In place of calling single query, we should call bulk update.
168                    CoordJobQueryExecutor.getInstance().executeUpdate(
169                            CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, coordJob);
170
171                }
172            }
173            catch (JPAExecutorException jex) {
174                LOG.warn("JPAExecutorException while attempting to materialize coordinator jobs", jex);
175            }
176        }
177
178        /**
179         * Adds callables to a list. If the number of callables in the list reaches {@link
180         * CoordMaterializeTriggerService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list
181         * is reset.
182         *
183         * @param callable the callable to queue.
184         */
185        private void queueCallable(XCallable<Void> callable) {
186            if (callables == null) {
187                callables = new ArrayList<XCallable<Void>>();
188            }
189            callables.add(callable);
190            if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
191                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
192                if (ret == false) {
193                    XLog.getLog(getClass()).warn(
194                            "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
195                                    + "Most possibly command queue is full. Queue size is :"
196                                    + Services.get().get(CallableQueueService.class).queueSize());
197                }
198                callables = new ArrayList<XCallable<Void>>();
199            }
200        }
201
202    }
203
204    @Override
205    public void init(Services services) throws ServiceException {
206        Configuration conf = services.getConf();
207        // default is 3600sec (1hr)
208        int materializationWindow = conf.getInt(CONF_MATERIALIZATION_WINDOW, CONF_MATERIALIZATION_WINDOW_DEFAULT);
209        // default is 300sec (5min)
210        int lookupInterval = Services.get().getConf().getInt(CONF_LOOKUP_INTERVAL, CONF_LOOKUP_INTERVAL_DEFAULT);
211        // default is 300sec (5min)
212        int schedulingInterval = Services.get().getConf().getInt(CONF_SCHEDULING_INTERVAL, lookupInterval);
213
214        Runnable lookupTriggerJobsRunnable = new CoordMaterializeTriggerRunnable(materializationWindow, lookupInterval);
215
216        services.get(SchedulerService.class).schedule(lookupTriggerJobsRunnable, 10, schedulingInterval,
217                                                      SchedulerService.Unit.SEC);
218    }
219
220    @Override
221    public void destroy() {
222    }
223
224    @Override
225    public Class<? extends Service> getInterface() {
226        return CoordMaterializeTriggerService.class;
227    }
228
229}