001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.io.IOException; 022import java.text.MessageFormat; 023import java.util.Collection; 024import java.util.List; 025import java.util.Properties; 026 027import javax.persistence.EntityManager; 028import javax.persistence.EntityManagerFactory; 029import javax.persistence.NoResultException; 030import javax.persistence.Persistence; 031import javax.persistence.PersistenceException; 032import javax.persistence.Query; 033 034import org.apache.commons.dbcp.BasicDataSource; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.oozie.BundleActionBean; 037import org.apache.oozie.BundleJobBean; 038import org.apache.oozie.CoordinatorActionBean; 039import org.apache.oozie.CoordinatorJobBean; 040import org.apache.oozie.ErrorCode; 041import org.apache.oozie.FaultInjection; 042import org.apache.oozie.SLAEventBean; 043import org.apache.oozie.WorkflowActionBean; 044import org.apache.oozie.WorkflowJobBean; 045import org.apache.oozie.client.rest.JsonBean; 046import org.apache.oozie.client.rest.JsonSLAEvent; 047import org.apache.oozie.compression.CodecFactory; 048import org.apache.oozie.executor.jpa.JPAExecutor; 049import org.apache.oozie.executor.jpa.JPAExecutorException; 050import org.apache.oozie.sla.SLARegistrationBean; 051import org.apache.oozie.sla.SLASummaryBean; 052import org.apache.oozie.util.IOUtils; 053import org.apache.oozie.util.Instrumentable; 054import org.apache.oozie.util.Instrumentation; 055import org.apache.oozie.util.XLog; 056import org.apache.openjpa.lib.jdbc.DecoratingDataSource; 057import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI; 058 059/** 060 * Service that manages JPA and executes {@link JPAExecutor}. 061 */ 062@SuppressWarnings("deprecation") 063public class JPAService implements Service, Instrumentable { 064 private static final String INSTRUMENTATION_GROUP_JPA = "jpa"; 065 066 public static final String CONF_DB_SCHEMA = "oozie.db.schema.name"; 067 068 public static final String CONF_PREFIX = Service.CONF_PREFIX + "JPAService."; 069 public static final String CONF_URL = CONF_PREFIX + "jdbc.url"; 070 public static final String CONF_DRIVER = CONF_PREFIX + "jdbc.driver"; 071 public static final String CONF_USERNAME = CONF_PREFIX + "jdbc.username"; 072 public static final String CONF_PASSWORD = CONF_PREFIX + "jdbc.password"; 073 public static final String CONF_CONN_DATA_SOURCE = CONF_PREFIX + "connection.data.source"; 074 public static final String CONF_CONN_PROPERTIES = CONF_PREFIX + "connection.properties"; 075 public static final String CONF_MAX_ACTIVE_CONN = CONF_PREFIX + "pool.max.active.conn"; 076 public static final String CONF_CREATE_DB_SCHEMA = CONF_PREFIX + "create.db.schema"; 077 public static final String CONF_VALIDATE_DB_CONN = CONF_PREFIX + "validate.db.connection"; 078 public static final String CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL = CONF_PREFIX + "validate.db.connection.eviction.interval"; 079 public static final String CONF_VALIDATE_DB_CONN_EVICTION_NUM = CONF_PREFIX + "validate.db.connection.eviction.num"; 080 081 082 private EntityManagerFactory factory; 083 private Instrumentation instr; 084 085 private static XLog LOG; 086 087 /** 088 * Return the public interface of the service. 089 * 090 * @return {@link JPAService}. 091 */ 092 public Class<? extends Service> getInterface() { 093 return JPAService.class; 094 } 095 096 @Override 097 public void instrument(Instrumentation instr) { 098 this.instr = instr; 099 100 final BasicDataSource dataSource = getBasicDataSource(); 101 if (dataSource != null) { 102 instr.addSampler("jdbc", "connections.active", 60, 1, new Instrumentation.Variable<Long>() { 103 @Override 104 public Long getValue() { 105 return (long) dataSource.getNumActive(); 106 } 107 }); 108 instr.addSampler("jdbc", "connections.idle", 60, 1, new Instrumentation.Variable<Long>() { 109 @Override 110 public Long getValue() { 111 return (long) dataSource.getNumIdle(); 112 } 113 }); 114 } 115 } 116 117 private BasicDataSource getBasicDataSource() { 118 // Get the BasicDataSource object; it could be wrapped in a DecoratingDataSource 119 // It might also not be a BasicDataSource if the user configured something different 120 BasicDataSource basicDataSource = null; 121 OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory; 122 Object connectionFactory = spi.getConfiguration().getConnectionFactory(); 123 if (connectionFactory instanceof DecoratingDataSource) { 124 DecoratingDataSource decoratingDataSource = (DecoratingDataSource) connectionFactory; 125 basicDataSource = (BasicDataSource) decoratingDataSource.getInnermostDelegate(); 126 } else if (connectionFactory instanceof BasicDataSource) { 127 basicDataSource = (BasicDataSource) connectionFactory; 128 } 129 return basicDataSource; 130 } 131 132 /** 133 * Initializes the {@link JPAService}. 134 * 135 * @param services services instance. 136 */ 137 public void init(Services services) throws ServiceException { 138 LOG = XLog.getLog(JPAService.class); 139 Configuration conf = services.getConf(); 140 String dbSchema = ConfigurationService.get(conf, CONF_DB_SCHEMA); 141 String url = ConfigurationService.get(conf, CONF_URL); 142 String driver = ConfigurationService.get(conf, CONF_DRIVER); 143 String user = ConfigurationService.get(conf, CONF_USERNAME); 144 String password = ConfigurationService.get(conf, CONF_PASSWORD).trim(); 145 String maxConn = ConfigurationService.get(conf, CONF_MAX_ACTIVE_CONN).trim(); 146 String dataSource = ConfigurationService.get(conf, CONF_CONN_DATA_SOURCE); 147 String connPropsConfig = ConfigurationService.get(conf, CONF_CONN_PROPERTIES); 148 boolean autoSchemaCreation = ConfigurationService.getBoolean(conf, CONF_CREATE_DB_SCHEMA); 149 boolean validateDbConn = ConfigurationService.getBoolean(conf, CONF_VALIDATE_DB_CONN); 150 String evictionInterval = ConfigurationService.get(conf, CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL).trim(); 151 String evictionNum = ConfigurationService.get(conf, CONF_VALIDATE_DB_CONN_EVICTION_NUM).trim(); 152 153 if (!url.startsWith("jdbc:")) { 154 throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, must start with 'jdbc:'"); 155 } 156 String dbType = url.substring("jdbc:".length()); 157 if (dbType.indexOf(":") <= 0) { 158 throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'"); 159 } 160 dbType = dbType.substring(0, dbType.indexOf(":")); 161 162 String persistentUnit = "oozie-" + dbType; 163 164 // Checking existince of ORM file for DB type 165 String ormFile = "META-INF/" + persistentUnit + "-orm.xml"; 166 try { 167 IOUtils.getResourceAsStream(ormFile, -1); 168 } 169 catch (IOException ex) { 170 throw new ServiceException(ErrorCode.E0609, dbType, ormFile); 171 } 172 173 String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}"; 174 connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn); 175 Properties props = new Properties(); 176 if (autoSchemaCreation) { 177 connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false"; 178 props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)"); 179 } 180 else if (validateDbConn) { 181 // validation can be done only if the schema already exist, else a 182 // connection cannot be obtained to create the schema. 183 String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval; 184 String num = "numTestsPerEvictionRun=" + evictionNum; 185 connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num; 186 connProps += ",ValidationQuery=select count(*) from VALIDATE_CONN"; 187 connProps = MessageFormat.format(connProps, dbSchema); 188 } 189 else { 190 connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false"; 191 } 192 if (connPropsConfig != null) { 193 connProps += "," + connPropsConfig; 194 } 195 props.setProperty("openjpa.ConnectionProperties", connProps); 196 197 props.setProperty("openjpa.ConnectionDriverName", dataSource); 198 199 factory = Persistence.createEntityManagerFactory(persistentUnit, props); 200 201 EntityManager entityManager = getEntityManager(); 202 entityManager.find(WorkflowActionBean.class, 1); 203 entityManager.find(WorkflowJobBean.class, 1); 204 entityManager.find(CoordinatorActionBean.class, 1); 205 entityManager.find(CoordinatorJobBean.class, 1); 206 entityManager.find(SLAEventBean.class, 1); 207 entityManager.find(JsonSLAEvent.class, 1); 208 entityManager.find(BundleJobBean.class, 1); 209 entityManager.find(BundleActionBean.class, 1); 210 entityManager.find(SLARegistrationBean.class, 1); 211 entityManager.find(SLASummaryBean.class, 1); 212 213 LOG.info(XLog.STD, "All entities initialized"); 214 // need to use a pseudo no-op transaction so all entities, datasource 215 // and connection pool are initialized one time only 216 entityManager.getTransaction().begin(); 217 OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory; 218 // Mask the password with '***' 219 String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,"); 220 LOG.info("JPA configuration: {0}", logMsg); 221 entityManager.getTransaction().commit(); 222 entityManager.close(); 223 try { 224 CodecFactory.initialize(conf); 225 } 226 catch (Exception ex) { 227 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex); 228 } 229 } 230 231 /** 232 * Destroy the JPAService 233 */ 234 public void destroy() { 235 if (factory != null && factory.isOpen()) { 236 factory.close(); 237 } 238 } 239 240 /** 241 * Execute a {@link JPAExecutor}. 242 * 243 * @param executor JPAExecutor to execute. 244 * @return return value of the JPAExecutor. 245 * @throws JPAExecutorException thrown if an jpa executor failed 246 */ 247 public <T> T execute(JPAExecutor<T> executor) throws JPAExecutorException { 248 EntityManager em = getEntityManager(); 249 Instrumentation.Cron cron = new Instrumentation.Cron(); 250 try { 251 LOG.trace("Executing JPAExecutor [{0}]", executor.getName()); 252 if (instr != null) { 253 instr.incr(INSTRUMENTATION_GROUP_JPA, executor.getName(), 1); 254 } 255 cron.start(); 256 em.getTransaction().begin(); 257 T t = executor.execute(em); 258 if (em.getTransaction().isActive()) { 259 if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) { 260 throw new RuntimeException("Skipping Commit for Failover Testing"); 261 } 262 263 em.getTransaction().commit(); 264 } 265 return t; 266 } 267 catch (PersistenceException e) { 268 throw new JPAExecutorException(ErrorCode.E0603, e); 269 } 270 finally { 271 cron.stop(); 272 if (instr != null) { 273 instr.addCron(INSTRUMENTATION_GROUP_JPA, executor.getName(), cron); 274 } 275 try { 276 if (em.getTransaction().isActive()) { 277 LOG.warn("JPAExecutor [{0}] ended with an active transaction, rolling back", executor.getName()); 278 em.getTransaction().rollback(); 279 } 280 } 281 catch (Exception ex) { 282 LOG.warn("Could not check/rollback transaction after JPAExecutor [{0}], {1}", executor.getName(), ex 283 .getMessage(), ex); 284 } 285 try { 286 if (em.isOpen()) { 287 em.close(); 288 } 289 else { 290 LOG.warn("JPAExecutor [{0}] closed the EntityManager, it should not!", executor.getName()); 291 } 292 } 293 catch (Exception ex) { 294 LOG.warn("Could not close EntityManager after JPAExecutor [{0}], {1}", executor.getName(), ex 295 .getMessage(), ex); 296 } 297 } 298 } 299 300 /** 301 * Execute an UPDATE query 302 * @param namedQueryName the name of query to be executed 303 * @param query query instance to be executed 304 * @param em Entity Manager 305 * @return Integer that query returns, which corresponds to the number of rows updated 306 * @throws JPAExecutorException 307 */ 308 public int executeUpdate(String namedQueryName, Query query, EntityManager em) throws JPAExecutorException { 309 Instrumentation.Cron cron = new Instrumentation.Cron(); 310 try { 311 312 LOG.trace("Executing Update/Delete Query [{0}]", namedQueryName); 313 if (instr != null) { 314 instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1); 315 } 316 cron.start(); 317 em.getTransaction().begin(); 318 int ret = query.executeUpdate(); 319 if (em.getTransaction().isActive()) { 320 if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) { 321 throw new RuntimeException("Skipping Commit for Failover Testing"); 322 } 323 em.getTransaction().commit(); 324 } 325 return ret; 326 } 327 catch (PersistenceException e) { 328 throw new JPAExecutorException(ErrorCode.E0603, e); 329 } 330 finally { 331 processFinally(em, cron, namedQueryName, true); 332 } 333 } 334 335 public static class QueryEntry<E extends Enum<E>> { 336 E namedQuery; 337 Query query; 338 339 public QueryEntry(E namedQuery, Query query) { 340 this.namedQuery = namedQuery; 341 this.query = query; 342 } 343 344 public Query getQuery() { 345 return this.query; 346 } 347 348 public E getQueryName() { 349 return this.namedQuery; 350 } 351 } 352 353 private void processFinally(EntityManager em, Instrumentation.Cron cron, String name, boolean checkActive) { 354 cron.stop(); 355 if (instr != null) { 356 instr.addCron(INSTRUMENTATION_GROUP_JPA, name, cron); 357 } 358 if (checkActive) { 359 try { 360 if (em.getTransaction().isActive()) { 361 LOG.warn("[{0}] ended with an active transaction, rolling back", name); 362 em.getTransaction().rollback(); 363 } 364 } 365 catch (Exception ex) { 366 LOG.warn("Could not check/rollback transaction after [{0}], {1}", name, 367 ex.getMessage(), ex); 368 } 369 } 370 try { 371 if (em.isOpen()) { 372 em.close(); 373 } 374 else { 375 LOG.warn("[{0}] closed the EntityManager, it should not!", name); 376 } 377 } 378 catch (Exception ex) { 379 LOG.warn("Could not close EntityManager after [{0}], {1}", name, ex.getMessage(), ex); 380 } 381 } 382 383 /** 384 * Execute multiple update/insert queries in one transaction 385 * @param insertBeans list of beans to be inserted 386 * @param updateQueryList list of update queries 387 * @param deleteBeans list of beans to be deleted 388 * @param em Entity Manager 389 * @throws JPAExecutorException 390 */ 391 public void executeBatchInsertUpdateDelete(Collection<JsonBean> insertBeans, List<QueryEntry> updateQueryList, 392 Collection<JsonBean> deleteBeans, EntityManager em) throws JPAExecutorException { 393 Instrumentation.Cron cron = new Instrumentation.Cron(); 394 try { 395 396 LOG.trace("Executing Queries in Batch"); 397 cron.start(); 398 em.getTransaction().begin(); 399 if (updateQueryList != null && updateQueryList.size() > 0) { 400 for (QueryEntry q : updateQueryList) { 401 if (instr != null) { 402 instr.incr(INSTRUMENTATION_GROUP_JPA, q.getQueryName().name(), 1); 403 } 404 q.getQuery().executeUpdate(); 405 } 406 } 407 if (insertBeans != null && insertBeans.size() > 0) { 408 for (JsonBean bean : insertBeans) { 409 em.persist(bean); 410 } 411 } 412 if (deleteBeans != null && deleteBeans.size() > 0) { 413 for (JsonBean bean : deleteBeans) { 414 em.remove(em.merge(bean)); 415 } 416 } 417 if (em.getTransaction().isActive()) { 418 if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) { 419 throw new RuntimeException("Skipping Commit for Failover Testing"); 420 } 421 em.getTransaction().commit(); 422 } 423 } 424 catch (PersistenceException e) { 425 throw new JPAExecutorException(ErrorCode.E0603, e); 426 } 427 finally { 428 processFinally(em, cron, "batchqueryexecutor", true); 429 } 430 } 431 432 /** 433 * Execute a SELECT query 434 * @param namedQueryName the name of query to be executed 435 * @param query query instance to be executed 436 * @param em Entity Manager 437 * @return object that matches the query 438 */ 439 public Object executeGet(String namedQueryName, Query query, EntityManager em) { 440 Instrumentation.Cron cron = new Instrumentation.Cron(); 441 try { 442 443 LOG.trace("Executing Select Query to Get a Single row [{0}]", namedQueryName); 444 if (instr != null) { 445 instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1); 446 } 447 448 cron.start(); 449 Object obj = null; 450 try { 451 obj = query.getSingleResult(); 452 } 453 catch (NoResultException e) { 454 // return null when no matched result 455 } 456 return obj; 457 } 458 finally { 459 processFinally(em, cron, namedQueryName, false); 460 } 461 } 462 463 /** 464 * Execute a SELECT query to get list of results 465 * @param namedQueryName the name of query to be executed 466 * @param query query instance to be executed 467 * @param em Entity Manager 468 * @return list containing results that match the query 469 */ 470 public List<?> executeGetList(String namedQueryName, Query query, EntityManager em) { 471 Instrumentation.Cron cron = new Instrumentation.Cron(); 472 try { 473 474 LOG.trace("Executing Select Query to Get Multiple Rows [{0}]", namedQueryName); 475 if (instr != null) { 476 instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1); 477 } 478 479 cron.start(); 480 List<?> resultList = null; 481 try { 482 resultList = query.getResultList(); 483 } 484 catch (NoResultException e) { 485 // return null when no matched result 486 } 487 return resultList; 488 } 489 finally { 490 processFinally(em, cron, namedQueryName, false); 491 } 492 } 493 494 /** 495 * Return an EntityManager. Used by the StoreService. Once the StoreService is removed this method must be removed. 496 * 497 * @return an entity manager 498 */ 499 public EntityManager getEntityManager() { 500 return factory.createEntityManager(); 501 } 502 503}