001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.util.ArrayList;
021    import java.util.Date;
022    import java.util.List;
023    
024    import org.apache.hadoop.conf.Configuration;
025    import org.apache.oozie.CoordinatorJobBean;
026    import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
027    import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
028    import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
029    import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
030    import org.apache.oozie.executor.jpa.CoordJobsToBeMaterializedJPAExecutor;
031    import org.apache.oozie.executor.jpa.JPAExecutorException;
032    import org.apache.oozie.util.XCallable;
033    import org.apache.oozie.util.XLog;
034    
035    /**
036     * The coordinator Materialization Lookup trigger service schedule lookup trigger command for every interval (default is
037     * 5 minutes ). This interval could be configured through oozie configuration defined is either oozie-default.xml or
038     * oozie-site.xml using the property name oozie.service.CoordMaterializeTriggerService.lookup.interval
039     */
040    public class CoordMaterializeTriggerService implements Service {
041        public static final String CONF_PREFIX = Service.CONF_PREFIX + "CoordMaterializeTriggerService.";
042        /**
043         * Time interval, in seconds, at which the Job materialization service will be scheduled to run.
044         */
045        public static final String CONF_LOOKUP_INTERVAL = CONF_PREFIX + "lookup.interval";
046        /**
047         * This configuration defined the duration for which job should be materialized in future
048         */
049        public static final String CONF_MATERIALIZATION_WINDOW = CONF_PREFIX + "materialization.window";
050        /**
051         * The number of callables to be queued in a batch.
052         */
053        public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
054        /**
055         * The number of coordinator jobs to be picked for materialization at a given time.
056         */
057        public static final String CONF_MATERIALIZATION_SYSTEM_LIMIT = CONF_PREFIX + "materialization.system.limit";
058    
059        private static final String INSTRUMENTATION_GROUP = "coord_job_mat";
060        private static final String INSTR_MAT_JOBS_COUNTER = "jobs";
061        private static final int CONF_LOOKUP_INTERVAL_DEFAULT = 300;
062        private static final int CONF_MATERIALIZATION_WINDOW_DEFAULT = 3600;
063        private static final int CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT = 50;
064    
065        /**
066         * This runnable class will run in every "interval" to queue CoordMaterializeTransitionXCommand.
067         */
068        static class CoordMaterializeTriggerRunnable implements Runnable {
069            private int materializationWindow;
070            private int lookupInterval;
071            private long delay = 0;
072            private List<XCallable<Void>> callables;
073            private List<XCallable<Void>> delayedCallables;
074    
075            public CoordMaterializeTriggerRunnable(int materializationWindow, int lookupInterval) {
076                this.materializationWindow = materializationWindow;
077                this.lookupInterval = lookupInterval;
078            }
079    
080            @Override
081            public void run() {
082                runCoordJobMatLookup();
083    
084                if (null != callables) {
085                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
086                    if (ret == false) {
087                        XLog.getLog(getClass()).warn(
088                                "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
089                                        + "Most possibly command queue is full. Queue size is :"
090                                        + Services.get().get(CallableQueueService.class).queueSize());
091                    }
092                    callables = null;
093                }
094                if (null != delayedCallables) {
095                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
096                    if (ret == false) {
097                        XLog.getLog(getClass()).warn(
098                                "Unable to queue the delayedCallables commands for CoordMaterializeTriggerRunnable. "
099                                        + "Most possibly Callable queue is full. Queue size is :"
100                                        + Services.get().get(CallableQueueService.class).queueSize());
101                    }
102                    delayedCallables = null;
103                    this.delay = 0;
104                }
105            }
106    
107            /**
108             * Recover coordinator jobs that should be materialized
109             */
110            private void runCoordJobMatLookup() {
111                XLog.Info.get().clear();
112                XLog LOG = XLog.getLog(getClass());
113                JPAService jpaService = Services.get().get(JPAService.class);
114                try {
115    
116                    // get current date
117                    Date currDate = new Date(new Date().getTime() + lookupInterval * 1000);
118                    // get list of all jobs that have actions that should be materialized.
119                    int materializationLimit = Services.get().getConf()
120                            .getInt(CONF_MATERIALIZATION_SYSTEM_LIMIT, CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT);
121                    CoordJobsToBeMaterializedJPAExecutor cmatcmd = new CoordJobsToBeMaterializedJPAExecutor(currDate,
122                            materializationLimit);
123                    List<CoordinatorJobBean> materializeJobs = jpaService.execute(cmatcmd);
124                    LOG.info("CoordMaterializeTriggerService - Curr Date= " + currDate + ", Num jobs to materialize = "
125                            + materializeJobs.size());
126                    for (CoordinatorJobBean coordJob : materializeJobs) {
127                        Services.get().get(InstrumentationService.class).get()
128                                .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1);
129                        int numWaitingActions = jpaService
130                                .execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
131                        LOG.info("Job :" + coordJob.getId() + "  numWaitingActions : " + numWaitingActions
132                                + " MatThrottle : " + coordJob.getMatThrottling());
133                        // update lastModifiedTime so next time others might have higher chance to get pick up
134                        coordJob.setLastModifiedTime(new Date());
135                        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
136                        if (numWaitingActions >= coordJob.getMatThrottling()) {
137                            LOG.info("info for JobID [" + coordJob.getId() + " already waiting "
138                                    + numWaitingActions + " actions. MatThrottle is : " + coordJob.getMatThrottling());
139                            continue;
140                        }
141                        queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
142    
143                    }
144    
145                }
146                catch (JPAExecutorException jex) {
147                    LOG.warn("JPAExecutorException while attempting to materialize coordinator jobs", jex);
148                }
149            }
150    
151            /**
152             * Adds callables to a list. If the number of callables in the list reaches {@link
153             * CoordMaterializeTriggerService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list
154             * is reset.
155             *
156             * @param callable the callable to queue.
157             */
158            private void queueCallable(XCallable<Void> callable) {
159                if (callables == null) {
160                    callables = new ArrayList<XCallable<Void>>();
161                }
162                callables.add(callable);
163                if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
164                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
165                    if (ret == false) {
166                        XLog.getLog(getClass()).warn(
167                                "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
168                                        + "Most possibly command queue is full. Queue size is :"
169                                        + Services.get().get(CallableQueueService.class).queueSize());
170                    }
171                    callables = new ArrayList<XCallable<Void>>();
172                }
173            }
174    
175        }
176    
177        @Override
178        public void init(Services services) throws ServiceException {
179            Configuration conf = services.getConf();
180            // default is 3600sec (1hr)
181            int materializationWindow = conf.getInt(CONF_MATERIALIZATION_WINDOW, CONF_MATERIALIZATION_WINDOW_DEFAULT);
182            // default is 300sec (5min)
183            int lookupInterval = Services.get().getConf().getInt(CONF_LOOKUP_INTERVAL, CONF_LOOKUP_INTERVAL_DEFAULT);
184    
185            Runnable lookupTriggerJobsRunnable = new CoordMaterializeTriggerRunnable(materializationWindow, lookupInterval);
186    
187            services.get(SchedulerService.class).schedule(lookupTriggerJobsRunnable, 10, lookupInterval,
188                                                          SchedulerService.Unit.SEC);
189        }
190    
191        @Override
192        public void destroy() {
193        }
194    
195        @Override
196        public Class<? extends Service> getInterface() {
197            return CoordMaterializeTriggerService.class;
198        }
199    
200    }