001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.io.IOException; 021 import java.text.MessageFormat; 022 import java.util.Properties; 023 024 import javax.persistence.EntityManager; 025 import javax.persistence.EntityManagerFactory; 026 import javax.persistence.Persistence; 027 import javax.persistence.PersistenceException; 028 029 import org.apache.hadoop.conf.Configuration; 030 import org.apache.oozie.BundleActionBean; 031 import org.apache.oozie.BundleJobBean; 032 import org.apache.oozie.CoordinatorActionBean; 033 import org.apache.oozie.CoordinatorJobBean; 034 import org.apache.oozie.ErrorCode; 035 import org.apache.oozie.FaultInjection; 036 import org.apache.oozie.SLAEventBean; 037 import org.apache.oozie.WorkflowActionBean; 038 import org.apache.oozie.WorkflowJobBean; 039 import org.apache.oozie.client.rest.JsonBundleJob; 040 import org.apache.oozie.client.rest.JsonCoordinatorAction; 041 import org.apache.oozie.client.rest.JsonCoordinatorJob; 042 import org.apache.oozie.client.rest.JsonSLAEvent; 043 import org.apache.oozie.client.rest.JsonWorkflowAction; 044 import org.apache.oozie.client.rest.JsonWorkflowJob; 045 import org.apache.oozie.executor.jpa.JPAExecutor; 046 import org.apache.oozie.executor.jpa.JPAExecutorException; 047 import org.apache.oozie.sla.SLARegistrationBean; 048 import org.apache.oozie.sla.SLASummaryBean; 049 import org.apache.oozie.util.IOUtils; 050 import org.apache.oozie.util.Instrumentable; 051 import org.apache.oozie.util.Instrumentation; 052 import org.apache.oozie.util.XLog; 053 import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI; 054 055 /** 056 * Service that manages JPA and executes {@link JPAExecutor}. 057 */ 058 @SuppressWarnings("deprecation") 059 public class JPAService implements Service, Instrumentable { 060 private static final String INSTRUMENTATION_GROUP = "jpa"; 061 062 public static final String CONF_DB_SCHEMA = "oozie.db.schema.name"; 063 064 public static final String CONF_PREFIX = Service.CONF_PREFIX + "JPAService."; 065 public static final String CONF_URL = CONF_PREFIX + "jdbc.url"; 066 public static final String CONF_DRIVER = CONF_PREFIX + "jdbc.driver"; 067 public static final String CONF_USERNAME = CONF_PREFIX + "jdbc.username"; 068 public static final String CONF_PASSWORD = CONF_PREFIX + "jdbc.password"; 069 public static final String CONF_CONN_DATA_SOURCE = CONF_PREFIX + "connection.data.source"; 070 071 public static final String CONF_MAX_ACTIVE_CONN = CONF_PREFIX + "pool.max.active.conn"; 072 public static final String CONF_CREATE_DB_SCHEMA = CONF_PREFIX + "create.db.schema"; 073 public static final String CONF_VALIDATE_DB_CONN = CONF_PREFIX + "validate.db.connection"; 074 public static final String CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL = CONF_PREFIX + "validate.db.connection.eviction.interval"; 075 public static final String CONF_VALIDATE_DB_CONN_EVICTION_NUM = CONF_PREFIX + "validate.db.connection.eviction.num"; 076 077 078 private EntityManagerFactory factory; 079 private Instrumentation instr; 080 081 private static XLog LOG; 082 083 /** 084 * Return the public interface of the service. 085 * 086 * @return {@link JPAService}. 087 */ 088 public Class<? extends Service> getInterface() { 089 return JPAService.class; 090 } 091 092 @Override 093 public void instrument(Instrumentation instr) { 094 this.instr = instr; 095 } 096 097 /** 098 * Initializes the {@link JPAService}. 099 * 100 * @param services services instance. 101 */ 102 public void init(Services services) throws ServiceException { 103 LOG = XLog.getLog(JPAService.class); 104 Configuration conf = services.getConf(); 105 String dbSchema = conf.get(CONF_DB_SCHEMA, "oozie"); 106 String url = conf.get(CONF_URL, "jdbc:derby:${oozie.home.dir}/${oozie.db.schema.name}-db;create=true"); 107 String driver = conf.get(CONF_DRIVER, "org.apache.derby.jdbc.EmbeddedDriver"); 108 String user = conf.get(CONF_USERNAME, "sa"); 109 String password = conf.get(CONF_PASSWORD, "").trim(); 110 String maxConn = conf.get(CONF_MAX_ACTIVE_CONN, "10").trim(); 111 String dataSource = conf.get(CONF_CONN_DATA_SOURCE, "org.apache.commons.dbcp.BasicDataSource"); 112 boolean autoSchemaCreation = conf.getBoolean(CONF_CREATE_DB_SCHEMA, true); 113 boolean validateDbConn = conf.getBoolean(CONF_VALIDATE_DB_CONN, false); 114 String evictionInterval = conf.get(CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL, "300000").trim(); 115 String evictionNum = conf.get(CONF_VALIDATE_DB_CONN_EVICTION_NUM, "10").trim(); 116 117 if (!url.startsWith("jdbc:")) { 118 throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, must start with 'jdbc:'"); 119 } 120 String dbType = url.substring("jdbc:".length()); 121 if (dbType.indexOf(":") <= 0) { 122 throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'"); 123 } 124 dbType = dbType.substring(0, dbType.indexOf(":")); 125 126 String persistentUnit = "oozie-" + dbType; 127 128 // Checking existince of ORM file for DB type 129 String ormFile = "META-INF/" + persistentUnit + "-orm.xml"; 130 try { 131 IOUtils.getResourceAsStream(ormFile, -1); 132 } 133 catch (IOException ex) { 134 throw new ServiceException(ErrorCode.E0609, dbType, ormFile); 135 } 136 137 String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}"; 138 connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn); 139 Properties props = new Properties(); 140 if (autoSchemaCreation) { 141 connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false"; 142 props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)"); 143 } 144 else if (validateDbConn) { 145 // validation can be done only if the schema already exist, else a 146 // connection cannot be obtained to create the schema. 147 String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval; 148 String num = "numTestsPerEvictionRun=" + evictionNum; 149 connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num; 150 connProps += ",ValidationQuery=select count(*) from VALIDATE_CONN"; 151 connProps = MessageFormat.format(connProps, dbSchema); 152 } 153 else { 154 connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false"; 155 } 156 props.setProperty("openjpa.ConnectionProperties", connProps); 157 158 props.setProperty("openjpa.ConnectionDriverName", dataSource); 159 160 factory = Persistence.createEntityManagerFactory(persistentUnit, props); 161 162 EntityManager entityManager = getEntityManager(); 163 entityManager.find(WorkflowActionBean.class, 1); 164 entityManager.find(WorkflowJobBean.class, 1); 165 entityManager.find(CoordinatorActionBean.class, 1); 166 entityManager.find(CoordinatorJobBean.class, 1); 167 entityManager.find(JsonWorkflowAction.class, 1); 168 entityManager.find(JsonWorkflowJob.class, 1); 169 entityManager.find(JsonCoordinatorAction.class, 1); 170 entityManager.find(JsonCoordinatorJob.class, 1); 171 entityManager.find(SLAEventBean.class, 1); 172 entityManager.find(JsonSLAEvent.class, 1); 173 entityManager.find(BundleJobBean.class, 1); 174 entityManager.find(JsonBundleJob.class, 1); 175 entityManager.find(BundleActionBean.class, 1); 176 entityManager.find(SLARegistrationBean.class, 1); 177 entityManager.find(SLASummaryBean.class, 1); 178 179 LOG.info(XLog.STD, "All entities initialized"); 180 // need to use a pseudo no-op transaction so all entities, datasource 181 // and connection pool are initialized one time only 182 entityManager.getTransaction().begin(); 183 OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory; 184 // Mask the password with '***' 185 String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,"); 186 LOG.info("JPA configuration: {0}", logMsg); 187 entityManager.getTransaction().commit(); 188 entityManager.close(); 189 } 190 191 /** 192 * Destroy the JPAService 193 */ 194 public void destroy() { 195 if (factory != null && factory.isOpen()) { 196 factory.close(); 197 } 198 } 199 200 /** 201 * Execute a {@link JPAExecutor}. 202 * 203 * @param executor JPAExecutor to execute. 204 * @return return value of the JPAExecutor. 205 * @throws JPAExecutorException thrown if an jpa executor failed 206 */ 207 public <T> T execute(JPAExecutor<T> executor) throws JPAExecutorException { 208 EntityManager em = getEntityManager(); 209 Instrumentation.Cron cron = new Instrumentation.Cron(); 210 try { 211 LOG.trace("Executing JPAExecutor [{0}]", executor.getName()); 212 if (instr != null) { 213 instr.incr(INSTRUMENTATION_GROUP, executor.getName(), 1); 214 } 215 cron.start(); 216 em.getTransaction().begin(); 217 T t = executor.execute(em); 218 if (em.getTransaction().isActive()) { 219 if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) { 220 throw new RuntimeException("Skipping Commit for Failover Testing"); 221 } 222 223 em.getTransaction().commit(); 224 } 225 return t; 226 } 227 catch (PersistenceException e) { 228 throw new JPAExecutorException(ErrorCode.E0603, e); 229 } 230 finally { 231 cron.stop(); 232 if (instr != null) { 233 instr.addCron(INSTRUMENTATION_GROUP, executor.getName(), cron); 234 } 235 try { 236 if (em.getTransaction().isActive()) { 237 LOG.warn("JPAExecutor [{0}] ended with an active transaction, rolling back", executor.getName()); 238 em.getTransaction().rollback(); 239 } 240 } 241 catch (Exception ex) { 242 LOG.warn("Could not check/rollback transaction after JPAExecutor [{0}], {1}", executor.getName(), ex 243 .getMessage(), ex); 244 } 245 try { 246 if (em.isOpen()) { 247 em.close(); 248 } 249 else { 250 LOG.warn("JPAExecutor [{0}] closed the EntityManager, it should not!", executor.getName()); 251 } 252 } 253 catch (Exception ex) { 254 LOG.warn("Could not close EntityManager after JPAExecutor [{0}], {1}", executor.getName(), ex 255 .getMessage(), ex); 256 } 257 } 258 } 259 260 /** 261 * Return an EntityManager. Used by the StoreService. Once the StoreService is removed this method must be removed. 262 * 263 * @return an entity manager 264 */ 265 EntityManager getEntityManager() { 266 return factory.createEntityManager(); 267 } 268 269 }