001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.sql.Connection; 022import java.util.HashMap; 023import java.util.Map; 024 025import org.apache.hadoop.conf.Configuration; 026import org.apache.oozie.ErrorCode; 027import org.apache.oozie.client.WorkflowJob; 028import org.apache.oozie.service.SchemaService.SchemaName; 029import org.apache.oozie.store.Store; 030import org.apache.oozie.store.StoreException; 031import org.apache.oozie.store.WorkflowStore; 032import org.apache.oozie.util.Instrumentable; 033import org.apache.oozie.util.Instrumentation; 034import org.apache.oozie.util.XLog; 035import org.apache.oozie.workflow.WorkflowLib; 036import org.apache.oozie.workflow.lite.DBLiteWorkflowLib; 037 038public class DBLiteWorkflowStoreService extends LiteWorkflowStoreService implements Instrumentable { 039 private boolean selectForUpdate; 040 private XLog log; 041 private int statusWindow; 042 043 public static final String CONF_PREFIX = Service.CONF_PREFIX + "DBLiteWorkflowStoreService."; 044 public static final String CONF_METRICS_INTERVAL_MINS = CONF_PREFIX + "status.metrics.collection.interval"; 045 public static final String CONF_METRICS_INTERVAL_WINDOW = CONF_PREFIX + "status.metrics.window"; 046 047 private static final String INSTRUMENTATION_GROUP = "jobstatus"; 048 private static final String INSTRUMENTATION_GROUP_WINDOW = "windowjobstatus"; 049 050 private Map<String, Integer> statusCounts = new HashMap<String, Integer>(); 051 private Map<String, Integer> statusWindowCounts = new HashMap<String, Integer>(); 052 053 /** 054 * Gets the number of workflows for each status and populates the hash. 055 */ 056 class JobStatusCountCallable implements Runnable { 057 @Override 058 public void run() { 059 WorkflowStore store = null; 060 try { 061 store = Services.get().get(WorkflowStoreService.class).create(); 062 store.beginTrx(); 063 WorkflowJob.Status[] wfStatusArr = WorkflowJob.Status.values(); 064 for (WorkflowJob.Status aWfStatusArr : wfStatusArr) { 065 statusCounts.put(aWfStatusArr.name(), store.getWorkflowCountWithStatus(aWfStatusArr.name())); 066 statusWindowCounts.put(aWfStatusArr.name(), store.getWorkflowCountWithStatusInLastNSeconds( 067 aWfStatusArr.name(), statusWindow)); 068 } 069 store.commitTrx(); 070 } 071 catch (StoreException e) { 072 if (store != null) { 073 store.rollbackTrx(); 074 } 075 log.warn("Exception while accessing the store", e); 076 } 077 catch (Exception ex) { 078 log.error("Exception, {0}", ex.getMessage(), ex); 079 if (store != null && store.isActive()) { 080 try { 081 store.rollbackTrx(); 082 } 083 catch (RuntimeException rex) { 084 log.warn("openjpa error, {0}", rex.getMessage(), rex); 085 } 086 } 087 } 088 finally { 089 if (store != null) { 090 if (!store.isActive()) { 091 try { 092 store.closeTrx(); 093 } 094 catch (RuntimeException rex) { 095 log.warn("Exception while attempting to close store", rex); 096 } 097 } 098 else { 099 log.warn("transaction is not committed or rolled back before closing entitymanager."); 100 } 101 } 102 } 103 } 104 } 105 106 public void init(Services services) throws ServiceException { 107 Configuration conf = services.getConf(); 108 statusWindow = ConfigurationService.getInt(conf, CONF_METRICS_INTERVAL_WINDOW); 109 int statusMetricsCollectionInterval = ConfigurationService.getInt(conf, CONF_METRICS_INTERVAL_MINS); 110 log = XLog.getLog(getClass()); 111 selectForUpdate = false; 112 113 WorkflowJob.Status[] wfStatusArr = WorkflowJob.Status.values(); 114 for (WorkflowJob.Status aWfStatusArr : wfStatusArr) { 115 statusCounts.put(aWfStatusArr.name(), 0); 116 statusWindowCounts.put(aWfStatusArr.name(), 0); 117 } 118 Runnable jobStatusCountCallable = new JobStatusCountCallable(); 119 services.get(SchedulerService.class).schedule(jobStatusCountCallable, 1, statusMetricsCollectionInterval, 120 SchedulerService.Unit.MIN); 121 } 122 123 public void destroy() { 124 } 125 126 /** 127 * Return the workflow lib without DB connection. Will be used for parsing purpose. 128 * 129 * @return Workflow Library 130 */ 131 @Override 132 public WorkflowLib getWorkflowLibWithNoDB() { 133 return getWorkflowLib(null); 134 } 135 136 private WorkflowLib getWorkflowLib(Connection conn) { 137 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.WORKFLOW); 138 return new DBLiteWorkflowLib(schema, LiteControlNodeHandler.class, 139 LiteDecisionHandler.class, LiteActionHandler.class, conn); 140 } 141 142 @Override 143 public WorkflowStore create() throws StoreException { 144 try { 145 return new WorkflowStore(selectForUpdate); 146 } 147 catch (Exception ex) { 148 throw new StoreException(ErrorCode.E0600, ex.getMessage(), ex); 149 } 150 } 151 152 @Override 153 public <S extends Store> WorkflowStore create(S store) throws StoreException { 154 try { 155 return new WorkflowStore(store, selectForUpdate); 156 } 157 catch (Exception ex) { 158 throw new StoreException(ErrorCode.E0600, ex.getMessage(), ex); 159 } 160 } 161 162 163 @Override 164 public void instrument(Instrumentation instr) { 165 final WorkflowJob.Status[] wfStatusArr = WorkflowJob.Status.values(); 166 for (WorkflowJob.Status aWfStatusArr : wfStatusArr) { 167 final String statusName = aWfStatusArr.name(); 168 instr.addVariable(INSTRUMENTATION_GROUP, statusName, new Instrumentation.Variable<Long>() { 169 public Long getValue() { 170 return statusCounts.get(statusName).longValue(); 171 } 172 }); 173 instr.addVariable(INSTRUMENTATION_GROUP_WINDOW, statusName, new Instrumentation.Variable<Long>() { 174 public Long getValue() { 175 return statusWindowCounts.get(statusName).longValue(); 176 } 177 }); 178 } 179 } 180}