001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.sql.Connection; 021 import java.util.HashMap; 022 import java.util.Map; 023 024 import org.apache.hadoop.conf.Configuration; 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.client.WorkflowJob; 027 import org.apache.oozie.service.SchemaService.SchemaName; 028 import org.apache.oozie.store.Store; 029 import org.apache.oozie.store.StoreException; 030 import org.apache.oozie.store.WorkflowStore; 031 import org.apache.oozie.util.Instrumentable; 032 import org.apache.oozie.util.Instrumentation; 033 import org.apache.oozie.util.XLog; 034 import org.apache.oozie.workflow.WorkflowLib; 035 import org.apache.oozie.workflow.lite.DBLiteWorkflowLib; 036 037 public class DBLiteWorkflowStoreService extends LiteWorkflowStoreService implements Instrumentable { 038 private boolean selectForUpdate; 039 private XLog log; 040 private int statusWindow; 041 042 public static final String CONF_PREFIX = Service.CONF_PREFIX + "DBLiteWorkflowStoreService."; 043 public static final String CONF_METRICS_INTERVAL_MINS = CONF_PREFIX + "status.metrics.collection.interval"; 044 public static final String CONF_METRICS_INTERVAL_WINDOW = CONF_PREFIX + "status.metrics.window"; 045 046 private static final String INSTRUMENTATION_GROUP = "jobstatus"; 047 private static final String INSTRUMENTATION_GROUP_WINDOW = "windowjobstatus"; 048 049 private Map<String, Integer> statusCounts = new HashMap<String, Integer>(); 050 private Map<String, Integer> statusWindowCounts = new HashMap<String, Integer>(); 051 052 /** 053 * Gets the number of workflows for each status and populates the hash. 054 */ 055 class JobStatusCountCallable implements Runnable { 056 @Override 057 public void run() { 058 WorkflowStore store = null; 059 try { 060 store = Services.get().get(WorkflowStoreService.class).create(); 061 store.beginTrx(); 062 WorkflowJob.Status[] wfStatusArr = WorkflowJob.Status.values(); 063 for (WorkflowJob.Status aWfStatusArr : wfStatusArr) { 064 statusCounts.put(aWfStatusArr.name(), store.getWorkflowCountWithStatus(aWfStatusArr.name())); 065 statusWindowCounts.put(aWfStatusArr.name(), store.getWorkflowCountWithStatusInLastNSeconds( 066 aWfStatusArr.name(), statusWindow)); 067 } 068 store.commitTrx(); 069 } 070 catch (StoreException e) { 071 if (store != null) { 072 store.rollbackTrx(); 073 } 074 log.warn("Exception while accessing the store", e); 075 } 076 catch (Exception ex) { 077 log.error("Exception, {0}", ex.getMessage(), ex); 078 if (store != null && store.isActive()) { 079 try { 080 store.rollbackTrx(); 081 } 082 catch (RuntimeException rex) { 083 log.warn("openjpa error, {0}", rex.getMessage(), rex); 084 } 085 } 086 } 087 finally { 088 if (store != null) { 089 if (!store.isActive()) { 090 try { 091 store.closeTrx(); 092 } 093 catch (RuntimeException rex) { 094 log.warn("Exception while attempting to close store", rex); 095 } 096 } 097 else { 098 log.warn("transaction is not committed or rolled back before closing entitymanager."); 099 } 100 } 101 } 102 } 103 } 104 105 public void init(Services services) throws ServiceException { 106 Configuration conf = services.getConf(); 107 statusWindow = conf.getInt(CONF_METRICS_INTERVAL_WINDOW, 3600); 108 int statusMetricsCollectionInterval = conf.getInt(CONF_METRICS_INTERVAL_MINS, 5); 109 log = XLog.getLog(getClass()); 110 selectForUpdate = false; 111 112 WorkflowJob.Status[] wfStatusArr = WorkflowJob.Status.values(); 113 for (WorkflowJob.Status aWfStatusArr : wfStatusArr) { 114 statusCounts.put(aWfStatusArr.name(), 0); 115 statusWindowCounts.put(aWfStatusArr.name(), 0); 116 } 117 Runnable jobStatusCountCallable = new JobStatusCountCallable(); 118 services.get(SchedulerService.class).schedule(jobStatusCountCallable, 1, statusMetricsCollectionInterval, 119 SchedulerService.Unit.MIN); 120 } 121 122 public void destroy() { 123 } 124 125 /** 126 * Return the workflow lib without DB connection. Will be used for parsing purpose. 127 * 128 * @return Workflow Library 129 */ 130 @Override 131 public WorkflowLib getWorkflowLibWithNoDB() { 132 return getWorkflowLib(null); 133 } 134 135 private WorkflowLib getWorkflowLib(Connection conn) { 136 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.WORKFLOW); 137 return new DBLiteWorkflowLib(schema, LiteControlNodeHandler.class, 138 LiteDecisionHandler.class, LiteActionHandler.class, conn); 139 } 140 141 @Override 142 public WorkflowStore create() throws StoreException { 143 try { 144 return new WorkflowStore(selectForUpdate); 145 } 146 catch (Exception ex) { 147 throw new StoreException(ErrorCode.E0600, ex.getMessage(), ex); 148 } 149 } 150 151 @Override 152 public <S extends Store> WorkflowStore create(S store) throws StoreException { 153 try { 154 return new WorkflowStore(store, selectForUpdate); 155 } 156 catch (Exception ex) { 157 throw new StoreException(ErrorCode.E0600, ex.getMessage(), ex); 158 } 159 } 160 161 162 @Override 163 public void instrument(Instrumentation instr) { 164 final WorkflowJob.Status[] wfStatusArr = WorkflowJob.Status.values(); 165 for (WorkflowJob.Status aWfStatusArr : wfStatusArr) { 166 final String statusName = aWfStatusArr.name(); 167 instr.addVariable(INSTRUMENTATION_GROUP, statusName, new Instrumentation.Variable<Long>() { 168 public Long getValue() { 169 return statusCounts.get(statusName).longValue(); 170 } 171 }); 172 instr.addVariable(INSTRUMENTATION_GROUP_WINDOW, statusName, new Instrumentation.Variable<Long>() { 173 public Long getValue() { 174 return statusWindowCounts.get(statusName).longValue(); 175 } 176 }); 177 } 178 } 179 }