001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.sql.Connection;
022import java.util.HashMap;
023import java.util.Map;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.oozie.ErrorCode;
027import org.apache.oozie.client.WorkflowJob;
028import org.apache.oozie.service.SchemaService.SchemaName;
029import org.apache.oozie.store.Store;
030import org.apache.oozie.store.StoreException;
031import org.apache.oozie.store.WorkflowStore;
032import org.apache.oozie.util.Instrumentable;
033import org.apache.oozie.util.Instrumentation;
034import org.apache.oozie.util.XLog;
035import org.apache.oozie.workflow.WorkflowLib;
036import org.apache.oozie.workflow.lite.DBLiteWorkflowLib;
037
038public class DBLiteWorkflowStoreService extends LiteWorkflowStoreService implements Instrumentable {
039    private boolean selectForUpdate;
040    private XLog log;
041    private int statusWindow;
042
043    public static final String CONF_PREFIX = Service.CONF_PREFIX + "DBLiteWorkflowStoreService.";
044    public static final String CONF_METRICS_INTERVAL_MINS = CONF_PREFIX + "status.metrics.collection.interval";
045    public static final String CONF_METRICS_INTERVAL_WINDOW = CONF_PREFIX + "status.metrics.window";
046
047    private static final String INSTRUMENTATION_GROUP = "jobstatus";
048    private static final String INSTRUMENTATION_GROUP_WINDOW = "windowjobstatus";
049
050    private Map<String, Integer> statusCounts = new HashMap<String, Integer>();
051    private Map<String, Integer> statusWindowCounts = new HashMap<String, Integer>();
052
053    /**
054     * Gets the number of workflows for each status and populates the hash.
055     */
056    class JobStatusCountCallable implements Runnable {
057        @Override
058        public void run() {
059            WorkflowStore store = null;
060            try {
061                store = Services.get().get(WorkflowStoreService.class).create();
062                store.beginTrx();
063                WorkflowJob.Status[] wfStatusArr = WorkflowJob.Status.values();
064                for (WorkflowJob.Status aWfStatusArr : wfStatusArr) {
065                    statusCounts.put(aWfStatusArr.name(), store.getWorkflowCountWithStatus(aWfStatusArr.name()));
066                    statusWindowCounts.put(aWfStatusArr.name(), store.getWorkflowCountWithStatusInLastNSeconds(
067                            aWfStatusArr.name(), statusWindow));
068                }
069                store.commitTrx();
070            }
071            catch (StoreException e) {
072                if (store != null) {
073                    store.rollbackTrx();
074                }
075                log.warn("Exception while accessing the store", e);
076            }
077            catch (Exception ex) {
078                log.error("Exception, {0}", ex.getMessage(), ex);
079                if (store != null && store.isActive()) {
080                    try {
081                        store.rollbackTrx();
082                    }
083                    catch (RuntimeException rex) {
084                        log.warn("openjpa error, {0}", rex.getMessage(), rex);
085                    }
086                }
087            }
088            finally {
089                if (store != null) {
090                    if (!store.isActive()) {
091                        try {
092                            store.closeTrx();
093                        }
094                        catch (RuntimeException rex) {
095                            log.warn("Exception while attempting to close store", rex);
096                        }
097                    }
098                    else {
099                        log.warn("transaction is not committed or rolled back before closing entitymanager.");
100                    }
101                }
102            }
103        }
104    }
105
106    public void init(Services services) throws ServiceException {
107        Configuration conf = services.getConf();
108        statusWindow = ConfigurationService.getInt(conf, CONF_METRICS_INTERVAL_WINDOW);
109        int statusMetricsCollectionInterval = ConfigurationService.getInt(conf, CONF_METRICS_INTERVAL_MINS);
110        log = XLog.getLog(getClass());
111        selectForUpdate = false;
112
113        WorkflowJob.Status[] wfStatusArr = WorkflowJob.Status.values();
114        for (WorkflowJob.Status aWfStatusArr : wfStatusArr) {
115            statusCounts.put(aWfStatusArr.name(), 0);
116            statusWindowCounts.put(aWfStatusArr.name(), 0);
117        }
118        Runnable jobStatusCountCallable = new JobStatusCountCallable();
119        services.get(SchedulerService.class).schedule(jobStatusCountCallable, 1, statusMetricsCollectionInterval,
120                                                      SchedulerService.Unit.MIN);
121    }
122
123    public void destroy() {
124    }
125
126    /**
127     * Return the workflow lib without DB connection. Will be used for parsing purpose.
128     *
129     * @return Workflow Library
130     */
131    @Override
132    public WorkflowLib getWorkflowLibWithNoDB() {
133        return getWorkflowLib(null);
134    }
135
136    private WorkflowLib getWorkflowLib(Connection conn) {
137        javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.WORKFLOW);
138        return new DBLiteWorkflowLib(schema, LiteControlNodeHandler.class,
139                                     LiteDecisionHandler.class, LiteActionHandler.class, conn);
140    }
141
142    @Override
143    public WorkflowStore create() throws StoreException {
144        try {
145            return new WorkflowStore(selectForUpdate);
146        }
147        catch (Exception ex) {
148            throw new StoreException(ErrorCode.E0600, ex.getMessage(), ex);
149        }
150    }
151
152    @Override
153    public <S extends Store> WorkflowStore create(S store) throws StoreException {
154        try {
155            return new WorkflowStore(store, selectForUpdate);
156        }
157        catch (Exception ex) {
158            throw new StoreException(ErrorCode.E0600, ex.getMessage(), ex);
159        }
160    }
161
162
163    @Override
164    public void instrument(Instrumentation instr) {
165        final WorkflowJob.Status[] wfStatusArr = WorkflowJob.Status.values();
166        for (WorkflowJob.Status aWfStatusArr : wfStatusArr) {
167            final String statusName = aWfStatusArr.name();
168            instr.addVariable(INSTRUMENTATION_GROUP, statusName, new Instrumentation.Variable<Long>() {
169                public Long getValue() {
170                    return statusCounts.get(statusName).longValue();
171                }
172            });
173            instr.addVariable(INSTRUMENTATION_GROUP_WINDOW, statusName, new Instrumentation.Variable<Long>() {
174                public Long getValue() {
175                    return statusWindowCounts.get(statusName).longValue();
176                }
177            });
178        }
179    }
180}