001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.io.IOException; 021import java.text.MessageFormat; 022import java.util.Collection; 023import java.util.List; 024import java.util.Properties; 025 026import javax.persistence.EntityManager; 027import javax.persistence.EntityManagerFactory; 028import javax.persistence.NoResultException; 029import javax.persistence.Persistence; 030import javax.persistence.PersistenceException; 031import javax.persistence.Query; 032 033import org.apache.commons.dbcp.BasicDataSource; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.oozie.BundleActionBean; 036import org.apache.oozie.BundleJobBean; 037import org.apache.oozie.CoordinatorActionBean; 038import org.apache.oozie.CoordinatorJobBean; 039import org.apache.oozie.ErrorCode; 040import org.apache.oozie.FaultInjection; 041import org.apache.oozie.SLAEventBean; 042import org.apache.oozie.WorkflowActionBean; 043import org.apache.oozie.WorkflowJobBean; 044import org.apache.oozie.client.rest.JsonBean; 045import org.apache.oozie.client.rest.JsonSLAEvent; 046import org.apache.oozie.compression.CodecFactory; 047import org.apache.oozie.executor.jpa.JPAExecutor; 048import org.apache.oozie.executor.jpa.JPAExecutorException; 049import org.apache.oozie.sla.SLARegistrationBean; 050import org.apache.oozie.sla.SLASummaryBean; 051import org.apache.oozie.util.IOUtils; 052import org.apache.oozie.util.Instrumentable; 053import org.apache.oozie.util.Instrumentation; 054import org.apache.oozie.util.XLog; 055import org.apache.openjpa.lib.jdbc.DecoratingDataSource; 056import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI; 057 058/** 059 * Service that manages JPA and executes {@link JPAExecutor}. 060 */ 061@SuppressWarnings("deprecation") 062public class JPAService implements Service, Instrumentable { 063 private static final String INSTRUMENTATION_GROUP_JPA = "jpa"; 064 065 public static final String CONF_DB_SCHEMA = "oozie.db.schema.name"; 066 067 public static final String CONF_PREFIX = Service.CONF_PREFIX + "JPAService."; 068 public static final String CONF_URL = CONF_PREFIX + "jdbc.url"; 069 public static final String CONF_DRIVER = CONF_PREFIX + "jdbc.driver"; 070 public static final String CONF_USERNAME = CONF_PREFIX + "jdbc.username"; 071 public static final String CONF_PASSWORD = CONF_PREFIX + "jdbc.password"; 072 public static final String CONF_CONN_DATA_SOURCE = CONF_PREFIX + "connection.data.source"; 073 public static final String CONF_CONN_PROPERTIES = CONF_PREFIX + "connection.properties"; 074 public static final String CONF_MAX_ACTIVE_CONN = CONF_PREFIX + "pool.max.active.conn"; 075 public static final String CONF_CREATE_DB_SCHEMA = CONF_PREFIX + "create.db.schema"; 076 public static final String CONF_VALIDATE_DB_CONN = CONF_PREFIX + "validate.db.connection"; 077 public static final String CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL = CONF_PREFIX + "validate.db.connection.eviction.interval"; 078 public static final String CONF_VALIDATE_DB_CONN_EVICTION_NUM = CONF_PREFIX + "validate.db.connection.eviction.num"; 079 080 081 private EntityManagerFactory factory; 082 private Instrumentation instr; 083 084 private static XLog LOG; 085 086 /** 087 * Return the public interface of the service. 088 * 089 * @return {@link JPAService}. 090 */ 091 public Class<? extends Service> getInterface() { 092 return JPAService.class; 093 } 094 095 @Override 096 public void instrument(Instrumentation instr) { 097 this.instr = instr; 098 099 final BasicDataSource dataSource = getBasicDataSource(); 100 if (dataSource != null) { 101 instr.addSampler("jdbc", "connections.active", 60, 1, new Instrumentation.Variable<Long>() { 102 @Override 103 public Long getValue() { 104 return (long) dataSource.getNumActive(); 105 } 106 }); 107 instr.addSampler("jdbc", "connections.idle", 60, 1, new Instrumentation.Variable<Long>() { 108 @Override 109 public Long getValue() { 110 return (long) dataSource.getNumIdle(); 111 } 112 }); 113 } 114 } 115 116 private BasicDataSource getBasicDataSource() { 117 // Get the BasicDataSource object; it could be wrapped in a DecoratingDataSource 118 // It might also not be a BasicDataSource if the user configured something different 119 BasicDataSource basicDataSource = null; 120 OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory; 121 Object connectionFactory = spi.getConfiguration().getConnectionFactory(); 122 if (connectionFactory instanceof DecoratingDataSource) { 123 DecoratingDataSource decoratingDataSource = (DecoratingDataSource) connectionFactory; 124 basicDataSource = (BasicDataSource) decoratingDataSource.getInnermostDelegate(); 125 } else if (connectionFactory instanceof BasicDataSource) { 126 basicDataSource = (BasicDataSource) connectionFactory; 127 } 128 return basicDataSource; 129 } 130 131 /** 132 * Initializes the {@link JPAService}. 133 * 134 * @param services services instance. 135 */ 136 public void init(Services services) throws ServiceException { 137 LOG = XLog.getLog(JPAService.class); 138 Configuration conf = services.getConf(); 139 String dbSchema = conf.get(CONF_DB_SCHEMA, "oozie"); 140 String url = conf.get(CONF_URL, "jdbc:derby:${oozie.home.dir}/${oozie.db.schema.name}-db;create=true"); 141 String driver = conf.get(CONF_DRIVER, "org.apache.derby.jdbc.EmbeddedDriver"); 142 String user = conf.get(CONF_USERNAME, "sa"); 143 String password = conf.get(CONF_PASSWORD, "").trim(); 144 String maxConn = conf.get(CONF_MAX_ACTIVE_CONN, "10").trim(); 145 String dataSource = conf.get(CONF_CONN_DATA_SOURCE, "org.apache.commons.dbcp.BasicDataSource"); 146 String connPropsConfig = conf.get(CONF_CONN_PROPERTIES); 147 boolean autoSchemaCreation = conf.getBoolean(CONF_CREATE_DB_SCHEMA, false); 148 boolean validateDbConn = conf.getBoolean(CONF_VALIDATE_DB_CONN, true); 149 String evictionInterval = conf.get(CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL, "300000").trim(); 150 String evictionNum = conf.get(CONF_VALIDATE_DB_CONN_EVICTION_NUM, "10").trim(); 151 152 if (!url.startsWith("jdbc:")) { 153 throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, must start with 'jdbc:'"); 154 } 155 String dbType = url.substring("jdbc:".length()); 156 if (dbType.indexOf(":") <= 0) { 157 throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'"); 158 } 159 dbType = dbType.substring(0, dbType.indexOf(":")); 160 161 String persistentUnit = "oozie-" + dbType; 162 163 // Checking existince of ORM file for DB type 164 String ormFile = "META-INF/" + persistentUnit + "-orm.xml"; 165 try { 166 IOUtils.getResourceAsStream(ormFile, -1); 167 } 168 catch (IOException ex) { 169 throw new ServiceException(ErrorCode.E0609, dbType, ormFile); 170 } 171 172 String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}"; 173 connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn); 174 Properties props = new Properties(); 175 if (autoSchemaCreation) { 176 connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false"; 177 props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)"); 178 } 179 else if (validateDbConn) { 180 // validation can be done only if the schema already exist, else a 181 // connection cannot be obtained to create the schema. 182 String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval; 183 String num = "numTestsPerEvictionRun=" + evictionNum; 184 connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num; 185 connProps += ",ValidationQuery=select count(*) from VALIDATE_CONN"; 186 connProps = MessageFormat.format(connProps, dbSchema); 187 } 188 else { 189 connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false"; 190 } 191 if (connPropsConfig != null) { 192 connProps += "," + connPropsConfig; 193 } 194 props.setProperty("openjpa.ConnectionProperties", connProps); 195 196 props.setProperty("openjpa.ConnectionDriverName", dataSource); 197 198 factory = Persistence.createEntityManagerFactory(persistentUnit, props); 199 200 EntityManager entityManager = getEntityManager(); 201 entityManager.find(WorkflowActionBean.class, 1); 202 entityManager.find(WorkflowJobBean.class, 1); 203 entityManager.find(CoordinatorActionBean.class, 1); 204 entityManager.find(CoordinatorJobBean.class, 1); 205 entityManager.find(SLAEventBean.class, 1); 206 entityManager.find(JsonSLAEvent.class, 1); 207 entityManager.find(BundleJobBean.class, 1); 208 entityManager.find(BundleActionBean.class, 1); 209 entityManager.find(SLARegistrationBean.class, 1); 210 entityManager.find(SLASummaryBean.class, 1); 211 212 LOG.info(XLog.STD, "All entities initialized"); 213 // need to use a pseudo no-op transaction so all entities, datasource 214 // and connection pool are initialized one time only 215 entityManager.getTransaction().begin(); 216 OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory; 217 // Mask the password with '***' 218 String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,"); 219 LOG.info("JPA configuration: {0}", logMsg); 220 entityManager.getTransaction().commit(); 221 entityManager.close(); 222 try { 223 CodecFactory.initialize(conf); 224 } 225 catch (Exception ex) { 226 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex); 227 } 228 } 229 230 /** 231 * Destroy the JPAService 232 */ 233 public void destroy() { 234 if (factory != null && factory.isOpen()) { 235 factory.close(); 236 } 237 } 238 239 /** 240 * Execute a {@link JPAExecutor}. 241 * 242 * @param executor JPAExecutor to execute. 243 * @return return value of the JPAExecutor. 244 * @throws JPAExecutorException thrown if an jpa executor failed 245 */ 246 public <T> T execute(JPAExecutor<T> executor) throws JPAExecutorException { 247 EntityManager em = getEntityManager(); 248 Instrumentation.Cron cron = new Instrumentation.Cron(); 249 try { 250 LOG.trace("Executing JPAExecutor [{0}]", executor.getName()); 251 if (instr != null) { 252 instr.incr(INSTRUMENTATION_GROUP_JPA, executor.getName(), 1); 253 } 254 cron.start(); 255 em.getTransaction().begin(); 256 T t = executor.execute(em); 257 if (em.getTransaction().isActive()) { 258 if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) { 259 throw new RuntimeException("Skipping Commit for Failover Testing"); 260 } 261 262 em.getTransaction().commit(); 263 } 264 return t; 265 } 266 catch (PersistenceException e) { 267 throw new JPAExecutorException(ErrorCode.E0603, e); 268 } 269 finally { 270 cron.stop(); 271 if (instr != null) { 272 instr.addCron(INSTRUMENTATION_GROUP_JPA, executor.getName(), cron); 273 } 274 try { 275 if (em.getTransaction().isActive()) { 276 LOG.warn("JPAExecutor [{0}] ended with an active transaction, rolling back", executor.getName()); 277 em.getTransaction().rollback(); 278 } 279 } 280 catch (Exception ex) { 281 LOG.warn("Could not check/rollback transaction after JPAExecutor [{0}], {1}", executor.getName(), ex 282 .getMessage(), ex); 283 } 284 try { 285 if (em.isOpen()) { 286 em.close(); 287 } 288 else { 289 LOG.warn("JPAExecutor [{0}] closed the EntityManager, it should not!", executor.getName()); 290 } 291 } 292 catch (Exception ex) { 293 LOG.warn("Could not close EntityManager after JPAExecutor [{0}], {1}", executor.getName(), ex 294 .getMessage(), ex); 295 } 296 } 297 } 298 299 /** 300 * Execute an UPDATE query 301 * @param namedQueryName the name of query to be executed 302 * @param query query instance to be executed 303 * @param em Entity Manager 304 * @return Integer that query returns, which corresponds to the number of rows updated 305 * @throws JPAExecutorException 306 */ 307 public int executeUpdate(String namedQueryName, Query query, EntityManager em) throws JPAExecutorException { 308 Instrumentation.Cron cron = new Instrumentation.Cron(); 309 try { 310 311 LOG.trace("Executing Update/Delete Query [{0}]", namedQueryName); 312 if (instr != null) { 313 instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1); 314 } 315 cron.start(); 316 em.getTransaction().begin(); 317 int ret = query.executeUpdate(); 318 if (em.getTransaction().isActive()) { 319 if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) { 320 throw new RuntimeException("Skipping Commit for Failover Testing"); 321 } 322 em.getTransaction().commit(); 323 } 324 return ret; 325 } 326 catch (PersistenceException e) { 327 throw new JPAExecutorException(ErrorCode.E0603, e); 328 } 329 finally { 330 processFinally(em, cron, namedQueryName, true); 331 } 332 } 333 334 public static class QueryEntry<E extends Enum<E>> { 335 E namedQuery; 336 Query query; 337 338 public QueryEntry(E namedQuery, Query query) { 339 this.namedQuery = namedQuery; 340 this.query = query; 341 } 342 343 public Query getQuery() { 344 return this.query; 345 } 346 347 public E getQueryName() { 348 return this.namedQuery; 349 } 350 } 351 352 private void processFinally(EntityManager em, Instrumentation.Cron cron, String name, boolean checkActive) { 353 cron.stop(); 354 if (instr != null) { 355 instr.addCron(INSTRUMENTATION_GROUP_JPA, name, cron); 356 } 357 if (checkActive) { 358 try { 359 if (em.getTransaction().isActive()) { 360 LOG.warn("[{0}] ended with an active transaction, rolling back", name); 361 em.getTransaction().rollback(); 362 } 363 } 364 catch (Exception ex) { 365 LOG.warn("Could not check/rollback transaction after [{0}], {1}", name, 366 ex.getMessage(), ex); 367 } 368 } 369 try { 370 if (em.isOpen()) { 371 em.close(); 372 } 373 else { 374 LOG.warn("[{0}] closed the EntityManager, it should not!", name); 375 } 376 } 377 catch (Exception ex) { 378 LOG.warn("Could not close EntityManager after [{0}], {1}", name, ex.getMessage(), ex); 379 } 380 } 381 382 /** 383 * Execute multiple update/insert queries in one transaction 384 * @param insertBeans list of beans to be inserted 385 * @param updateQueryList list of update queries 386 * @param deleteBeans list of beans to be deleted 387 * @param em Entity Manager 388 * @throws JPAExecutorException 389 */ 390 public void executeBatchInsertUpdateDelete(Collection<JsonBean> insertBeans, List<QueryEntry> updateQueryList, 391 Collection<JsonBean> deleteBeans, EntityManager em) throws JPAExecutorException { 392 Instrumentation.Cron cron = new Instrumentation.Cron(); 393 try { 394 395 LOG.trace("Executing Queries in Batch"); 396 cron.start(); 397 em.getTransaction().begin(); 398 if (updateQueryList != null && updateQueryList.size() > 0) { 399 for (QueryEntry q : updateQueryList) { 400 if (instr != null) { 401 instr.incr(INSTRUMENTATION_GROUP_JPA, q.getQueryName().name(), 1); 402 } 403 q.getQuery().executeUpdate(); 404 } 405 } 406 if (insertBeans != null && insertBeans.size() > 0) { 407 for (JsonBean bean : insertBeans) { 408 em.persist(bean); 409 } 410 } 411 if (deleteBeans != null && deleteBeans.size() > 0) { 412 for (JsonBean bean : deleteBeans) { 413 em.remove(em.merge(bean)); 414 } 415 } 416 if (em.getTransaction().isActive()) { 417 if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) { 418 throw new RuntimeException("Skipping Commit for Failover Testing"); 419 } 420 em.getTransaction().commit(); 421 } 422 } 423 catch (PersistenceException e) { 424 throw new JPAExecutorException(ErrorCode.E0603, e); 425 } 426 finally { 427 processFinally(em, cron, "batchqueryexecutor", true); 428 } 429 } 430 431 /** 432 * Execute a SELECT query 433 * @param namedQueryName the name of query to be executed 434 * @param query query instance to be executed 435 * @param em Entity Manager 436 * @return object that matches the query 437 */ 438 public Object executeGet(String namedQueryName, Query query, EntityManager em) { 439 Instrumentation.Cron cron = new Instrumentation.Cron(); 440 try { 441 442 LOG.trace("Executing Select Query to Get a Single row [{0}]", namedQueryName); 443 if (instr != null) { 444 instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1); 445 } 446 447 cron.start(); 448 Object obj = null; 449 try { 450 obj = query.getSingleResult(); 451 } 452 catch (NoResultException e) { 453 // return null when no matched result 454 } 455 return obj; 456 } 457 finally { 458 processFinally(em, cron, namedQueryName, false); 459 } 460 } 461 462 /** 463 * Execute a SELECT query to get list of results 464 * @param namedQueryName the name of query to be executed 465 * @param query query instance to be executed 466 * @param em Entity Manager 467 * @return list containing results that match the query 468 */ 469 public List<?> executeGetList(String namedQueryName, Query query, EntityManager em) { 470 Instrumentation.Cron cron = new Instrumentation.Cron(); 471 try { 472 473 LOG.trace("Executing Select Query to Get Multiple Rows [{0}]", namedQueryName); 474 if (instr != null) { 475 instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1); 476 } 477 478 cron.start(); 479 List<?> resultList = null; 480 try { 481 resultList = query.getResultList(); 482 } 483 catch (NoResultException e) { 484 // return null when no matched result 485 } 486 return resultList; 487 } 488 finally { 489 processFinally(em, cron, namedQueryName, false); 490 } 491 } 492 493 /** 494 * Return an EntityManager. Used by the StoreService. Once the StoreService is removed this method must be removed. 495 * 496 * @return an entity manager 497 */ 498 public EntityManager getEntityManager() { 499 return factory.createEntityManager(); 500 } 501 502}