001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.util.ArrayList;
021    import java.util.Date;
022    import java.util.List;
023    
024    import org.apache.hadoop.conf.Configuration;
025    import org.apache.oozie.CoordinatorJobBean;
026    import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
027    import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
028    import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
029    import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
030    import org.apache.oozie.executor.jpa.CoordJobsToBeMaterializedJPAExecutor;
031    import org.apache.oozie.executor.jpa.JPAExecutorException;
032    import org.apache.oozie.util.XCallable;
033    import org.apache.oozie.util.XLog;
034    
035    /**
036     * The coordinator Materialization Lookup trigger service schedule lookup trigger command for every interval (default is
037     * 5 minutes ). This interval could be configured through oozie configuration defined is either oozie-default.xml or
038     * oozie-site.xml using the property name oozie.service.CoordMaterializeTriggerService.lookup.interval
039     */
040    public class CoordMaterializeTriggerService implements Service {
041        public static final String CONF_PREFIX = Service.CONF_PREFIX + "CoordMaterializeTriggerService.";
042        /**
043         * Time interval, in seconds, at which the Job materialization service will be scheduled to run.
044         */
045        public static final String CONF_LOOKUP_INTERVAL = CONF_PREFIX + "lookup.interval";
046        /**
047         * This configuration defined the duration for which job should be materialized in future
048         */
049        public static final String CONF_MATERIALIZATION_WINDOW = CONF_PREFIX + "materialization.window";
050        /**
051         * The number of callables to be queued in a batch.
052         */
053        public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
054        /**
055         * The number of coordinator jobs to be picked for materialization at a given time.
056         */
057        public static final String CONF_MATERIALIZATION_SYSTEM_LIMIT = CONF_PREFIX + "materialization.system.limit";
058    
059        private static final String INSTRUMENTATION_GROUP = "coord_job_mat";
060        private static final String INSTR_MAT_JOBS_COUNTER = "jobs";
061        private static final int CONF_LOOKUP_INTERVAL_DEFAULT = 300;
062        private static final int CONF_MATERIALIZATION_WINDOW_DEFAULT = 3600;
063        private static final int CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT = 50;
064    
065        /**
066         * This runnable class will run in every "interval" to queue CoordMaterializeTransitionXCommand.
067         */
068        static class CoordMaterializeTriggerRunnable implements Runnable {
069            private int materializationWindow;
070            private long delay = 0;
071            private List<XCallable<Void>> callables;
072            private List<XCallable<Void>> delayedCallables;
073    
074            public CoordMaterializeTriggerRunnable(int materializationWindow) {
075                this.materializationWindow = materializationWindow;
076            }
077    
078            @Override
079            public void run() {
080                runCoordJobMatLookup();
081    
082                if (null != callables) {
083                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
084                    if (ret == false) {
085                        XLog.getLog(getClass()).warn(
086                                "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
087                                        + "Most possibly command queue is full. Queue size is :"
088                                        + Services.get().get(CallableQueueService.class).queueSize());
089                    }
090                    callables = null;
091                }
092                if (null != delayedCallables) {
093                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
094                    if (ret == false) {
095                        XLog.getLog(getClass()).warn(
096                                "Unable to queue the delayedCallables commands for CoordMaterializeTriggerRunnable. "
097                                        + "Most possibly Callable queue is full. Queue size is :"
098                                        + Services.get().get(CallableQueueService.class).queueSize());
099                    }
100                    delayedCallables = null;
101                    this.delay = 0;
102                }
103            }
104    
105            /**
106             * Recover coordinator jobs that should be materialized
107             */
108            private void runCoordJobMatLookup() {
109                XLog.Info.get().clear();
110                XLog LOG = XLog.getLog(getClass());
111                JPAService jpaService = Services.get().get(JPAService.class);
112                try {
113    
114                    // get current date
115                    Date currDate = new Date(new Date().getTime() + CONF_LOOKUP_INTERVAL_DEFAULT * 1000);
116                    // get list of all jobs that have actions that should be materialized.
117                    int materializationLimit = Services.get().getConf()
118                            .getInt(CONF_MATERIALIZATION_SYSTEM_LIMIT, CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT);
119                    CoordJobsToBeMaterializedJPAExecutor cmatcmd = new CoordJobsToBeMaterializedJPAExecutor(currDate,
120                            materializationLimit);
121                    List<CoordinatorJobBean> materializeJobs = jpaService.execute(cmatcmd);
122                    LOG.debug("CoordMaterializeTriggerService - Curr Date= " + currDate + ", Num jobs to materialize = "
123                            + materializeJobs.size());
124                    for (CoordinatorJobBean coordJob : materializeJobs) {
125                        Services.get().get(InstrumentationService.class).get()
126                                .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1);
127                        int numWaitingActions = jpaService
128                                .execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
129                        LOG.debug("Job :" + coordJob.getId() + "  numWaitingActions : " + numWaitingActions
130                                + " MatThrottle : " + coordJob.getMatThrottling());
131                        // update lastModifiedTime so next time others might have higher chance to get pick up
132                        coordJob.setLastModifiedTime(new Date());
133                        jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
134                        if (numWaitingActions >= coordJob.getMatThrottling()) {
135                            LOG.debug("Materialization skipped for JobID [" + coordJob.getId() + " already waiting "
136                                    + numWaitingActions + " actions. MatThrottle is : " + coordJob.getMatThrottling());
137                            continue;
138                        }
139                        queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
140    
141                    }
142    
143                }
144                catch (JPAExecutorException jex) {
145                    LOG.warn("JPAExecutorException while attempting to materialize coordinator jobs", jex);
146                }
147            }
148    
149            /**
150             * Adds callables to a list. If the number of callables in the list reaches {@link
151             * CoordMaterializeTriggerService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list
152             * is reset.
153             *
154             * @param callable the callable to queue.
155             */
156            private void queueCallable(XCallable<Void> callable) {
157                if (callables == null) {
158                    callables = new ArrayList<XCallable<Void>>();
159                }
160                callables.add(callable);
161                if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
162                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
163                    if (ret == false) {
164                        XLog.getLog(getClass()).warn(
165                                "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
166                                        + "Most possibly command queue is full. Queue size is :"
167                                        + Services.get().get(CallableQueueService.class).queueSize());
168                    }
169                    callables = new ArrayList<XCallable<Void>>();
170                }
171            }
172    
173        }
174    
175        @Override
176        public void init(Services services) throws ServiceException {
177            Configuration conf = services.getConf();
178            Runnable lookupTriggerJobsRunnable = new CoordMaterializeTriggerRunnable(conf.getInt(
179                    CONF_MATERIALIZATION_WINDOW, CONF_MATERIALIZATION_WINDOW_DEFAULT));// Default is 1 hour
180            services.get(SchedulerService.class).schedule(lookupTriggerJobsRunnable, 10,
181                                                          conf.getInt(CONF_LOOKUP_INTERVAL, CONF_LOOKUP_INTERVAL_DEFAULT),// Default is 5 minutes
182                                                          SchedulerService.Unit.SEC);
183        }
184    
185        @Override
186        public void destroy() {
187        }
188    
189        @Override
190        public Class<? extends Service> getInterface() {
191            return CoordMaterializeTriggerService.class;
192        }
193    
194    }