001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.List;
024
025import org.apache.oozie.CoordinatorJobBean;
026import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
027import org.apache.oozie.executor.jpa.BatchQueryExecutor;
028import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
029import org.apache.oozie.executor.jpa.JPAExecutorException;
030import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
031import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
032import org.apache.oozie.lock.LockToken;
033import org.apache.oozie.util.XCallable;
034import org.apache.oozie.util.XLog;
035import org.apache.oozie.util.DateUtils;
036
037/**
038 * The coordinator Materialization Lookup trigger service schedule lookup trigger command for every interval (default is
039 * 5 minutes ). This interval could be configured through oozie configuration defined is either oozie-default.xml or
040 * oozie-site.xml using the property name oozie.service.CoordMaterializeTriggerService.lookup.interval
041 */
042public class CoordMaterializeTriggerService implements Service {
043    public static final String CONF_PREFIX = Service.CONF_PREFIX + "CoordMaterializeTriggerService.";
044    /**
045     * Time interval, in seconds, at which the Job materialization service will be scheduled to run.
046     */
047    public static final String CONF_LOOKUP_INTERVAL = CONF_PREFIX + "lookup.interval";
048
049    public static final String CONF_SCHEDULING_INTERVAL = CONF_PREFIX + "scheduling.interval";
050    /**
051     * This configuration defined the duration for which job should be materialized in future
052     */
053    public static final String CONF_MATERIALIZATION_WINDOW = CONF_PREFIX + "materialization.window";
054    /**
055     * The number of callables to be queued in a batch.
056     */
057    public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
058    /**
059     * The number of coordinator jobs to be picked for materialization at a given time.
060     */
061    public static final String CONF_MATERIALIZATION_SYSTEM_LIMIT = CONF_PREFIX + "materialization.system.limit";
062
063    private static final String INSTRUMENTATION_GROUP = "coord_job_mat";
064    private static final String INSTR_MAT_JOBS_COUNTER = "jobs";
065
066    /**
067     * This runnable class will run in every "interval" to queue CoordMaterializeTransitionXCommand.
068     */
069    static class CoordMaterializeTriggerRunnable implements Runnable {
070        private int materializationWindow;
071        private int lookupInterval;
072        private long delay = 0;
073        private List<XCallable<Void>> callables;
074        private List<XCallable<Void>> delayedCallables;
075        private XLog LOG = XLog.getLog(getClass());
076
077
078        public CoordMaterializeTriggerRunnable(int materializationWindow, int lookupInterval) {
079            this.materializationWindow = materializationWindow;
080            this.lookupInterval = lookupInterval;
081        }
082
083        @Override
084        public void run() {
085            LockToken lock = null;
086
087            // first check if there is some other running instance from the same service;
088            try {
089                lock = Services.get().get(MemoryLocksService.class)
090                        .getWriteLock(CoordMaterializeTriggerService.class.getName(), lockTimeout);
091
092                if (lock != null) {
093                    runCoordJobMatLookup();
094                    if (null != callables) {
095                        boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
096                        if (ret == false) {
097                            XLog.getLog(getClass()).warn(
098                                    "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
099                                            + "Most possibly command queue is full. Queue size is :"
100                                            + Services.get().get(CallableQueueService.class).queueSize());
101                        }
102                        callables = null;
103                    }
104                    if (null != delayedCallables) {
105                        boolean ret = Services.get().get(CallableQueueService.class)
106                                .queueSerial(delayedCallables, this.delay);
107                        if (ret == false) {
108                            XLog.getLog(getClass()).warn(
109                                    "Unable to queue the delayedCallables commands for CoordMaterializeTriggerRunnable. "
110                                            + "Most possibly Callable queue is full. Queue size is :"
111                                            + Services.get().get(CallableQueueService.class).queueSize());
112                        }
113                        delayedCallables = null;
114                        this.delay = 0;
115                    }
116                }
117
118                else {
119                    LOG.debug("Can't obtain lock, skipping");
120                }
121            }
122            catch (Exception e) {
123                LOG.error("Exception", e);
124            }
125            finally {
126                if (lock != null) {
127                    lock.release();
128                    LOG.info("Released lock for [{0}]", CoordMaterializeTriggerService.class.getName());
129                }
130
131            }
132
133        }
134
135        /**
136         * Recover coordinator jobs that should be materialized
137         * @throws JPAExecutorException
138         */
139        private void runCoordJobMatLookup() throws JPAExecutorException {
140            List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
141            XLog.Info.get().clear();
142            XLog LOG = XLog.getLog(getClass());
143            try {
144                // get current date
145                Date currDate = new Date(new Date().getTime() + lookupInterval * 1000);
146                // get list of all jobs that have actions that should be materialized.
147                int materializationLimit = ConfigurationService.getInt(CONF_MATERIALIZATION_SYSTEM_LIMIT);
148                materializeCoordJobs(currDate, materializationLimit, LOG, updateList);
149            }
150
151            catch (Exception ex) {
152                LOG.error("Exception while attempting to materialize coordinator jobs, {0}", ex.getMessage(), ex);
153            }
154            finally {
155                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
156            }
157        }
158
159        private void materializeCoordJobs(Date currDate, int limit, XLog LOG, List<UpdateEntry> updateList)
160                throws JPAExecutorException {
161            try {
162                List<CoordinatorJobBean> materializeJobs = CoordJobQueryExecutor.getInstance().getList(
163                        CoordJobQuery.GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION, currDate, limit);
164                LOG.info("CoordMaterializeTriggerService - Curr Date= " + DateUtils.formatDateOozieTZ(currDate)
165                        + ", Num jobs to materialize = " + materializeJobs.size());
166                for (CoordinatorJobBean coordJob : materializeJobs) {
167                    Services.get().get(InstrumentationService.class).get()
168                            .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1);
169                    queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
170                    coordJob.setLastModifiedTime(new Date());
171                    updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME,
172                            coordJob));
173                }
174            }
175            catch (JPAExecutorException jex) {
176                LOG.warn("JPAExecutorException while attempting to materialize coordinator jobs", jex);
177            }
178        }
179
180        /**
181         * Adds callables to a list. If the number of callables in the list reaches {@link
182         * CoordMaterializeTriggerService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list
183         * is reset.
184         *
185         * @param callable the callable to queue.
186         */
187        private void queueCallable(XCallable<Void> callable) {
188            if (callables == null) {
189                callables = new ArrayList<XCallable<Void>>();
190            }
191            callables.add(callable);
192            if (callables.size() == ConfigurationService.getInt(CONF_CALLABLE_BATCH_SIZE)) {
193                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
194                if (ret == false) {
195                    XLog.getLog(getClass()).warn(
196                            "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
197                                    + "Most possibly command queue is full. Queue size is :"
198                                    + Services.get().get(CallableQueueService.class).queueSize());
199                }
200                callables = new ArrayList<XCallable<Void>>();
201            }
202        }
203
204    }
205
206    @Override
207    public void init(Services services) throws ServiceException {
208        // default is 3600sec (1hr)
209        int materializationWindow = ConfigurationService.getInt(services.getConf(), CONF_MATERIALIZATION_WINDOW);
210        // default is 300sec (5min)
211        int lookupInterval = ConfigurationService.getInt(services.getConf(), CONF_LOOKUP_INTERVAL);
212        // default is 300sec (5min)
213        int schedulingInterval = Services.get().getConf().getInt(CONF_SCHEDULING_INTERVAL, lookupInterval);
214
215        Runnable lookupTriggerJobsRunnable = new CoordMaterializeTriggerRunnable(materializationWindow, lookupInterval);
216
217        services.get(SchedulerService.class).schedule(lookupTriggerJobsRunnable, 10, schedulingInterval,
218                                                      SchedulerService.Unit.SEC);
219    }
220
221    @Override
222    public void destroy() {
223    }
224
225    @Override
226    public Class<? extends Service> getInterface() {
227        return CoordMaterializeTriggerService.class;
228    }
229
230}