001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.io.IOException; 022import java.text.MessageFormat; 023import java.util.Collection; 024import java.util.List; 025import java.util.Properties; 026 027import javax.persistence.EntityManager; 028import javax.persistence.EntityManagerFactory; 029import javax.persistence.NoResultException; 030import javax.persistence.Persistence; 031import javax.persistence.PersistenceException; 032import javax.persistence.Query; 033 034import org.apache.commons.dbcp.BasicDataSource; 035import org.apache.commons.lang.StringUtils; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.oozie.BundleActionBean; 038import org.apache.oozie.BundleJobBean; 039import org.apache.oozie.CoordinatorActionBean; 040import org.apache.oozie.CoordinatorJobBean; 041import org.apache.oozie.ErrorCode; 042import org.apache.oozie.FaultInjection; 043import org.apache.oozie.SLAEventBean; 044import org.apache.oozie.WorkflowActionBean; 045import org.apache.oozie.WorkflowJobBean; 046import org.apache.oozie.client.rest.JsonBean; 047import org.apache.oozie.client.rest.JsonSLAEvent; 048import org.apache.oozie.compression.CodecFactory; 049import org.apache.oozie.executor.jpa.JPAExecutor; 050import org.apache.oozie.executor.jpa.JPAExecutorException; 051import org.apache.oozie.sla.SLARegistrationBean; 052import org.apache.oozie.sla.SLASummaryBean; 053import org.apache.oozie.util.IOUtils; 054import org.apache.oozie.util.Instrumentable; 055import org.apache.oozie.util.Instrumentation; 056import org.apache.oozie.util.XLog; 057import org.apache.openjpa.lib.jdbc.DecoratingDataSource; 058import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI; 059 060/** 061 * Service that manages JPA and executes {@link JPAExecutor}. 062 */ 063@SuppressWarnings("deprecation") 064public class JPAService implements Service, Instrumentable { 065 private static final String INSTRUMENTATION_GROUP_JPA = "jpa"; 066 067 public static final String CONF_DB_SCHEMA = "oozie.db.schema.name"; 068 069 public static final String CONF_PREFIX = Service.CONF_PREFIX + "JPAService."; 070 public static final String CONF_URL = CONF_PREFIX + "jdbc.url"; 071 public static final String CONF_DRIVER = CONF_PREFIX + "jdbc.driver"; 072 public static final String CONF_USERNAME = CONF_PREFIX + "jdbc.username"; 073 public static final String CONF_PASSWORD = CONF_PREFIX + "jdbc.password"; 074 public static final String CONF_CONN_DATA_SOURCE = CONF_PREFIX + "connection.data.source"; 075 public static final String CONF_CONN_PROPERTIES = CONF_PREFIX + "connection.properties"; 076 public static final String CONF_MAX_ACTIVE_CONN = CONF_PREFIX + "pool.max.active.conn"; 077 public static final String CONF_CREATE_DB_SCHEMA = CONF_PREFIX + "create.db.schema"; 078 public static final String CONF_VALIDATE_DB_CONN = CONF_PREFIX + "validate.db.connection"; 079 public static final String CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL = CONF_PREFIX + "validate.db.connection.eviction.interval"; 080 public static final String CONF_VALIDATE_DB_CONN_EVICTION_NUM = CONF_PREFIX + "validate.db.connection.eviction.num"; 081 public static final String CONF_OPENJPA_BROKER_IMPL = CONF_PREFIX + "openjpa.BrokerImpl"; 082 083 084 085 private EntityManagerFactory factory; 086 private Instrumentation instr; 087 088 private static XLog LOG; 089 090 /** 091 * Return the public interface of the service. 092 * 093 * @return {@link JPAService}. 094 */ 095 public Class<? extends Service> getInterface() { 096 return JPAService.class; 097 } 098 099 @Override 100 public void instrument(Instrumentation instr) { 101 this.instr = instr; 102 103 final BasicDataSource dataSource = getBasicDataSource(); 104 if (dataSource != null) { 105 instr.addSampler("jdbc", "connections.active", 60, 1, new Instrumentation.Variable<Long>() { 106 @Override 107 public Long getValue() { 108 return (long) dataSource.getNumActive(); 109 } 110 }); 111 instr.addSampler("jdbc", "connections.idle", 60, 1, new Instrumentation.Variable<Long>() { 112 @Override 113 public Long getValue() { 114 return (long) dataSource.getNumIdle(); 115 } 116 }); 117 } 118 } 119 120 private BasicDataSource getBasicDataSource() { 121 // Get the BasicDataSource object; it could be wrapped in a DecoratingDataSource 122 // It might also not be a BasicDataSource if the user configured something different 123 BasicDataSource basicDataSource = null; 124 OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory; 125 Object connectionFactory = spi.getConfiguration().getConnectionFactory(); 126 if (connectionFactory instanceof DecoratingDataSource) { 127 DecoratingDataSource decoratingDataSource = (DecoratingDataSource) connectionFactory; 128 basicDataSource = (BasicDataSource) decoratingDataSource.getInnermostDelegate(); 129 } else if (connectionFactory instanceof BasicDataSource) { 130 basicDataSource = (BasicDataSource) connectionFactory; 131 } 132 return basicDataSource; 133 } 134 135 /** 136 * Initializes the {@link JPAService}. 137 * 138 * @param services services instance. 139 */ 140 public void init(Services services) throws ServiceException { 141 LOG = XLog.getLog(JPAService.class); 142 Configuration conf = services.getConf(); 143 String dbSchema = ConfigurationService.get(conf, CONF_DB_SCHEMA); 144 String url = ConfigurationService.get(conf, CONF_URL); 145 String driver = ConfigurationService.get(conf, CONF_DRIVER); 146 String user = ConfigurationService.get(conf, CONF_USERNAME); 147 String password = ConfigurationService.getPassword(conf, CONF_PASSWORD).trim(); 148 String maxConn = ConfigurationService.get(conf, CONF_MAX_ACTIVE_CONN).trim(); 149 String dataSource = ConfigurationService.get(conf, CONF_CONN_DATA_SOURCE); 150 String connPropsConfig = ConfigurationService.get(conf, CONF_CONN_PROPERTIES); 151 String brokerImplConfig = ConfigurationService.get(conf, CONF_OPENJPA_BROKER_IMPL); 152 boolean autoSchemaCreation = ConfigurationService.getBoolean(conf, CONF_CREATE_DB_SCHEMA); 153 boolean validateDbConn = ConfigurationService.getBoolean(conf, CONF_VALIDATE_DB_CONN); 154 String evictionInterval = ConfigurationService.get(conf, CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL).trim(); 155 String evictionNum = ConfigurationService.get(conf, CONF_VALIDATE_DB_CONN_EVICTION_NUM).trim(); 156 157 if (!url.startsWith("jdbc:")) { 158 throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, must start with 'jdbc:'"); 159 } 160 String dbType = url.substring("jdbc:".length()); 161 if (dbType.indexOf(":") <= 0) { 162 throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'"); 163 } 164 dbType = dbType.substring(0, dbType.indexOf(":")); 165 166 String persistentUnit = "oozie-" + dbType; 167 168 // Checking existince of ORM file for DB type 169 String ormFile = "META-INF/" + persistentUnit + "-orm.xml"; 170 try { 171 IOUtils.getResourceAsStream(ormFile, -1); 172 } 173 catch (IOException ex) { 174 throw new ServiceException(ErrorCode.E0609, dbType, ormFile); 175 } 176 177 // support for mysql replication urls "jdbc:mysql:replication://master:port,slave:port[,slave:port]/db" 178 if (url.startsWith("jdbc:mysql:replication")) { 179 url = "\"".concat(url).concat("\""); 180 LOG.info("A jdbc replication url is provided. Url: [{0}]", url); 181 } 182 183 String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}"; 184 connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn); 185 Properties props = new Properties(); 186 if (autoSchemaCreation) { 187 connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false"; 188 props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)"); 189 } 190 else if (validateDbConn) { 191 // validation can be done only if the schema already exist, else a 192 // connection cannot be obtained to create the schema. 193 String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval; 194 String num = "numTestsPerEvictionRun=" + evictionNum; 195 connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num; 196 connProps += ",ValidationQuery=select count(*) from VALIDATE_CONN"; 197 connProps = MessageFormat.format(connProps, dbSchema); 198 } 199 else { 200 connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false"; 201 } 202 if (connPropsConfig != null) { 203 connProps += "," + connPropsConfig; 204 } 205 props.setProperty("openjpa.ConnectionProperties", connProps); 206 207 props.setProperty("openjpa.ConnectionDriverName", dataSource); 208 if (!StringUtils.isEmpty(brokerImplConfig)) { 209 props.setProperty("openjpa.BrokerImpl", brokerImplConfig); 210 LOG.info("Setting openjpa.BrokerImpl to {0}", brokerImplConfig); 211 } 212 213 factory = Persistence.createEntityManagerFactory(persistentUnit, props); 214 215 EntityManager entityManager = getEntityManager(); 216 entityManager.find(WorkflowActionBean.class, 1); 217 entityManager.find(WorkflowJobBean.class, 1); 218 entityManager.find(CoordinatorActionBean.class, 1); 219 entityManager.find(CoordinatorJobBean.class, 1); 220 entityManager.find(SLAEventBean.class, 1); 221 entityManager.find(JsonSLAEvent.class, 1); 222 entityManager.find(BundleJobBean.class, 1); 223 entityManager.find(BundleActionBean.class, 1); 224 entityManager.find(SLARegistrationBean.class, 1); 225 entityManager.find(SLASummaryBean.class, 1); 226 227 LOG.info(XLog.STD, "All entities initialized"); 228 // need to use a pseudo no-op transaction so all entities, datasource 229 // and connection pool are initialized one time only 230 entityManager.getTransaction().begin(); 231 OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory; 232 // Mask the password with '***' 233 String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,"); 234 LOG.info("JPA configuration: {0}", logMsg); 235 entityManager.getTransaction().commit(); 236 entityManager.close(); 237 try { 238 CodecFactory.initialize(conf); 239 } 240 catch (Exception ex) { 241 throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex); 242 } 243 } 244 245 /** 246 * Destroy the JPAService 247 */ 248 public void destroy() { 249 if (factory != null && factory.isOpen()) { 250 factory.close(); 251 } 252 } 253 254 /** 255 * Execute a {@link JPAExecutor}. 256 * 257 * @param executor JPAExecutor to execute. 258 * @return return value of the JPAExecutor. 259 * @throws JPAExecutorException thrown if an jpa executor failed 260 */ 261 public <T> T execute(JPAExecutor<T> executor) throws JPAExecutorException { 262 EntityManager em = getEntityManager(); 263 Instrumentation.Cron cron = new Instrumentation.Cron(); 264 try { 265 LOG.trace("Executing JPAExecutor [{0}]", executor.getName()); 266 if (instr != null) { 267 instr.incr(INSTRUMENTATION_GROUP_JPA, executor.getName(), 1); 268 } 269 cron.start(); 270 em.getTransaction().begin(); 271 T t = executor.execute(em); 272 if (em.getTransaction().isActive()) { 273 if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) { 274 throw new RuntimeException("Skipping Commit for Failover Testing"); 275 } 276 277 em.getTransaction().commit(); 278 } 279 return t; 280 } 281 catch (PersistenceException e) { 282 throw new JPAExecutorException(ErrorCode.E0603, e); 283 } 284 finally { 285 cron.stop(); 286 if (instr != null) { 287 instr.addCron(INSTRUMENTATION_GROUP_JPA, executor.getName(), cron); 288 } 289 try { 290 if (em.getTransaction().isActive()) { 291 LOG.warn("JPAExecutor [{0}] ended with an active transaction, rolling back", executor.getName()); 292 em.getTransaction().rollback(); 293 } 294 } 295 catch (Exception ex) { 296 LOG.warn("Could not check/rollback transaction after JPAExecutor [{0}], {1}", executor.getName(), ex 297 .getMessage(), ex); 298 } 299 try { 300 if (em.isOpen()) { 301 em.close(); 302 } 303 else { 304 LOG.warn("JPAExecutor [{0}] closed the EntityManager, it should not!", executor.getName()); 305 } 306 } 307 catch (Exception ex) { 308 LOG.warn("Could not close EntityManager after JPAExecutor [{0}], {1}", executor.getName(), ex 309 .getMessage(), ex); 310 } 311 } 312 } 313 314 /** 315 * Execute an UPDATE query 316 * @param namedQueryName the name of query to be executed 317 * @param query query instance to be executed 318 * @param em Entity Manager 319 * @return Integer that query returns, which corresponds to the number of rows updated 320 * @throws JPAExecutorException 321 */ 322 public int executeUpdate(String namedQueryName, Query query, EntityManager em) throws JPAExecutorException { 323 Instrumentation.Cron cron = new Instrumentation.Cron(); 324 try { 325 326 LOG.trace("Executing Update/Delete Query [{0}]", namedQueryName); 327 if (instr != null) { 328 instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1); 329 } 330 cron.start(); 331 em.getTransaction().begin(); 332 int ret = query.executeUpdate(); 333 if (em.getTransaction().isActive()) { 334 if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) { 335 throw new RuntimeException("Skipping Commit for Failover Testing"); 336 } 337 em.getTransaction().commit(); 338 } 339 return ret; 340 } 341 catch (PersistenceException e) { 342 throw new JPAExecutorException(ErrorCode.E0603, e); 343 } 344 finally { 345 processFinally(em, cron, namedQueryName, true); 346 } 347 } 348 349 public static class QueryEntry<E extends Enum<E>> { 350 E namedQuery; 351 Query query; 352 353 public QueryEntry(E namedQuery, Query query) { 354 this.namedQuery = namedQuery; 355 this.query = query; 356 } 357 358 public Query getQuery() { 359 return this.query; 360 } 361 362 public E getQueryName() { 363 return this.namedQuery; 364 } 365 } 366 367 private void processFinally(EntityManager em, Instrumentation.Cron cron, String name, boolean checkActive) { 368 cron.stop(); 369 if (instr != null) { 370 instr.addCron(INSTRUMENTATION_GROUP_JPA, name, cron); 371 } 372 if (checkActive) { 373 try { 374 if (em.getTransaction().isActive()) { 375 LOG.warn("[{0}] ended with an active transaction, rolling back", name); 376 em.getTransaction().rollback(); 377 } 378 } 379 catch (Exception ex) { 380 LOG.warn("Could not check/rollback transaction after [{0}], {1}", name, 381 ex.getMessage(), ex); 382 } 383 } 384 try { 385 if (em.isOpen()) { 386 em.close(); 387 } 388 else { 389 LOG.warn("[{0}] closed the EntityManager, it should not!", name); 390 } 391 } 392 catch (Exception ex) { 393 LOG.warn("Could not close EntityManager after [{0}], {1}", name, ex.getMessage(), ex); 394 } 395 } 396 397 /** 398 * Execute multiple update/insert queries in one transaction 399 * @param insertBeans list of beans to be inserted 400 * @param updateQueryList list of update queries 401 * @param deleteBeans list of beans to be deleted 402 * @param em Entity Manager 403 * @throws JPAExecutorException 404 */ 405 public void executeBatchInsertUpdateDelete(Collection<JsonBean> insertBeans, List<QueryEntry> updateQueryList, 406 Collection<JsonBean> deleteBeans, EntityManager em) throws JPAExecutorException { 407 Instrumentation.Cron cron = new Instrumentation.Cron(); 408 try { 409 410 LOG.trace("Executing Queries in Batch"); 411 cron.start(); 412 em.getTransaction().begin(); 413 if (updateQueryList != null && updateQueryList.size() > 0) { 414 for (QueryEntry q : updateQueryList) { 415 if (instr != null) { 416 instr.incr(INSTRUMENTATION_GROUP_JPA, q.getQueryName().name(), 1); 417 } 418 q.getQuery().executeUpdate(); 419 } 420 } 421 if (insertBeans != null && insertBeans.size() > 0) { 422 for (JsonBean bean : insertBeans) { 423 em.persist(bean); 424 } 425 } 426 if (deleteBeans != null && deleteBeans.size() > 0) { 427 for (JsonBean bean : deleteBeans) { 428 em.remove(em.merge(bean)); 429 } 430 } 431 if (em.getTransaction().isActive()) { 432 if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) { 433 throw new RuntimeException("Skipping Commit for Failover Testing"); 434 } 435 em.getTransaction().commit(); 436 } 437 } 438 catch (PersistenceException e) { 439 throw new JPAExecutorException(ErrorCode.E0603, e); 440 } 441 finally { 442 processFinally(em, cron, "batchqueryexecutor", true); 443 } 444 } 445 446 /** 447 * Execute a SELECT query 448 * @param namedQueryName the name of query to be executed 449 * @param query query instance to be executed 450 * @param em Entity Manager 451 * @return object that matches the query 452 */ 453 public Object executeGet(String namedQueryName, Query query, EntityManager em) { 454 Instrumentation.Cron cron = new Instrumentation.Cron(); 455 try { 456 457 LOG.trace("Executing Select Query to Get a Single row [{0}]", namedQueryName); 458 if (instr != null) { 459 instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1); 460 } 461 462 cron.start(); 463 Object obj = null; 464 try { 465 obj = query.getSingleResult(); 466 } 467 catch (NoResultException e) { 468 // return null when no matched result 469 } 470 return obj; 471 } 472 finally { 473 processFinally(em, cron, namedQueryName, false); 474 } 475 } 476 477 /** 478 * Execute a SELECT query to get list of results 479 * @param namedQueryName the name of query to be executed 480 * @param query query instance to be executed 481 * @param em Entity Manager 482 * @return list containing results that match the query 483 */ 484 public List<?> executeGetList(String namedQueryName, Query query, EntityManager em) { 485 Instrumentation.Cron cron = new Instrumentation.Cron(); 486 try { 487 488 LOG.trace("Executing Select Query to Get Multiple Rows [{0}]", namedQueryName); 489 if (instr != null) { 490 instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1); 491 } 492 493 cron.start(); 494 List<?> resultList = null; 495 try { 496 resultList = query.getResultList(); 497 } 498 catch (NoResultException e) { 499 // return null when no matched result 500 } 501 return resultList; 502 } 503 finally { 504 processFinally(em, cron, namedQueryName, false); 505 } 506 } 507 508 /** 509 * Return an EntityManager. Used by the StoreService. Once the StoreService is removed this method must be removed. 510 * 511 * @return an entity manager 512 */ 513 public EntityManager getEntityManager() { 514 return factory.createEntityManager(); 515 } 516 517}