001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.io.IOException;
022import java.text.MessageFormat;
023import java.util.Collection;
024import java.util.List;
025import java.util.Properties;
026
027import javax.persistence.EntityManager;
028import javax.persistence.EntityManagerFactory;
029import javax.persistence.NoResultException;
030import javax.persistence.Persistence;
031import javax.persistence.PersistenceException;
032import javax.persistence.Query;
033
034import org.apache.commons.dbcp.BasicDataSource;
035import org.apache.commons.lang.StringUtils;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.oozie.BundleActionBean;
038import org.apache.oozie.BundleJobBean;
039import org.apache.oozie.CoordinatorActionBean;
040import org.apache.oozie.CoordinatorJobBean;
041import org.apache.oozie.ErrorCode;
042import org.apache.oozie.FaultInjection;
043import org.apache.oozie.SLAEventBean;
044import org.apache.oozie.WorkflowActionBean;
045import org.apache.oozie.WorkflowJobBean;
046import org.apache.oozie.client.rest.JsonBean;
047import org.apache.oozie.client.rest.JsonSLAEvent;
048import org.apache.oozie.compression.CodecFactory;
049import org.apache.oozie.executor.jpa.JPAExecutor;
050import org.apache.oozie.executor.jpa.JPAExecutorException;
051import org.apache.oozie.sla.SLARegistrationBean;
052import org.apache.oozie.sla.SLASummaryBean;
053import org.apache.oozie.util.IOUtils;
054import org.apache.oozie.util.Instrumentable;
055import org.apache.oozie.util.Instrumentation;
056import org.apache.oozie.util.XLog;
057import org.apache.openjpa.lib.jdbc.DecoratingDataSource;
058import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
059
060/**
061 * Service that manages JPA and executes {@link JPAExecutor}.
062 */
063@SuppressWarnings("deprecation")
064public class JPAService implements Service, Instrumentable {
065    private static final String INSTRUMENTATION_GROUP_JPA = "jpa";
066
067    public static final String CONF_DB_SCHEMA = "oozie.db.schema.name";
068
069    public static final String CONF_PREFIX = Service.CONF_PREFIX + "JPAService.";
070    public static final String CONF_URL = CONF_PREFIX + "jdbc.url";
071    public static final String CONF_DRIVER = CONF_PREFIX + "jdbc.driver";
072    public static final String CONF_USERNAME = CONF_PREFIX + "jdbc.username";
073    public static final String CONF_PASSWORD = CONF_PREFIX + "jdbc.password";
074    public static final String CONF_CONN_DATA_SOURCE = CONF_PREFIX + "connection.data.source";
075    public static final String CONF_CONN_PROPERTIES = CONF_PREFIX + "connection.properties";
076    public static final String CONF_MAX_ACTIVE_CONN = CONF_PREFIX + "pool.max.active.conn";
077    public static final String CONF_CREATE_DB_SCHEMA = CONF_PREFIX + "create.db.schema";
078    public static final String CONF_VALIDATE_DB_CONN = CONF_PREFIX + "validate.db.connection";
079    public static final String CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL = CONF_PREFIX + "validate.db.connection.eviction.interval";
080    public static final String CONF_VALIDATE_DB_CONN_EVICTION_NUM = CONF_PREFIX + "validate.db.connection.eviction.num";
081    public static final String CONF_OPENJPA_BROKER_IMPL = CONF_PREFIX + "openjpa.BrokerImpl";
082
083
084
085    private EntityManagerFactory factory;
086    private Instrumentation instr;
087
088    private static XLog LOG;
089
090    /**
091     * Return the public interface of the service.
092     *
093     * @return {@link JPAService}.
094     */
095    public Class<? extends Service> getInterface() {
096        return JPAService.class;
097    }
098
099    @Override
100    public void instrument(Instrumentation instr) {
101        this.instr = instr;
102
103        final BasicDataSource dataSource = getBasicDataSource();
104        if (dataSource != null) {
105            instr.addSampler("jdbc", "connections.active", 60, 1, new Instrumentation.Variable<Long>() {
106                @Override
107                public Long getValue() {
108                    return (long) dataSource.getNumActive();
109                }
110            });
111            instr.addSampler("jdbc", "connections.idle", 60, 1, new Instrumentation.Variable<Long>() {
112                @Override
113                public Long getValue() {
114                    return (long) dataSource.getNumIdle();
115                }
116            });
117        }
118    }
119
120    private BasicDataSource getBasicDataSource() {
121        // Get the BasicDataSource object; it could be wrapped in a DecoratingDataSource
122        // It might also not be a BasicDataSource if the user configured something different
123        BasicDataSource basicDataSource = null;
124        OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory;
125        Object connectionFactory = spi.getConfiguration().getConnectionFactory();
126        if (connectionFactory instanceof DecoratingDataSource) {
127            DecoratingDataSource decoratingDataSource = (DecoratingDataSource) connectionFactory;
128            basicDataSource = (BasicDataSource) decoratingDataSource.getInnermostDelegate();
129        } else if (connectionFactory instanceof BasicDataSource) {
130            basicDataSource = (BasicDataSource) connectionFactory;
131        }
132        return basicDataSource;
133    }
134
135    /**
136     * Initializes the {@link JPAService}.
137     *
138     * @param services services instance.
139     */
140    public void init(Services services) throws ServiceException {
141        LOG = XLog.getLog(JPAService.class);
142        Configuration conf = services.getConf();
143        String dbSchema = ConfigurationService.get(conf, CONF_DB_SCHEMA);
144        String url = ConfigurationService.get(conf, CONF_URL);
145        String driver = ConfigurationService.get(conf, CONF_DRIVER);
146        String user = ConfigurationService.get(conf, CONF_USERNAME);
147        String password = ConfigurationService.getPassword(conf, CONF_PASSWORD).trim();
148        String maxConn = ConfigurationService.get(conf, CONF_MAX_ACTIVE_CONN).trim();
149        String dataSource = ConfigurationService.get(conf, CONF_CONN_DATA_SOURCE);
150        String connPropsConfig = ConfigurationService.get(conf, CONF_CONN_PROPERTIES);
151        String brokerImplConfig = ConfigurationService.get(conf, CONF_OPENJPA_BROKER_IMPL);
152        boolean autoSchemaCreation = ConfigurationService.getBoolean(conf, CONF_CREATE_DB_SCHEMA);
153        boolean validateDbConn = ConfigurationService.getBoolean(conf, CONF_VALIDATE_DB_CONN);
154        String evictionInterval = ConfigurationService.get(conf, CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL).trim();
155        String evictionNum = ConfigurationService.get(conf, CONF_VALIDATE_DB_CONN_EVICTION_NUM).trim();
156
157        if (!url.startsWith("jdbc:")) {
158            throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, must start with 'jdbc:'");
159        }
160        String dbType = url.substring("jdbc:".length());
161        if (dbType.indexOf(":") <= 0) {
162            throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'");
163        }
164        dbType = dbType.substring(0, dbType.indexOf(":"));
165
166        String persistentUnit = "oozie-" + dbType;
167
168        // Checking existince of ORM file for DB type
169        String ormFile = "META-INF/" + persistentUnit + "-orm.xml";
170        try {
171            IOUtils.getResourceAsStream(ormFile, -1);
172        }
173        catch (IOException ex) {
174            throw new ServiceException(ErrorCode.E0609, dbType, ormFile);
175        }
176
177        // support for mysql replication urls "jdbc:mysql:replication://master:port,slave:port[,slave:port]/db"
178        if (url.startsWith("jdbc:mysql:replication")) {
179            url = "\"".concat(url).concat("\"");
180            LOG.info("A jdbc replication url is provided. Url: [{0}]", url);
181        }
182
183        String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}";
184        connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn);
185        Properties props = new Properties();
186        if (autoSchemaCreation) {
187            connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
188            props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)");
189        }
190        else if (validateDbConn) {
191            // validation can be done only if the schema already exist, else a
192            // connection cannot be obtained to create the schema.
193            String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval;
194            String num = "numTestsPerEvictionRun=" + evictionNum;
195            connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num;
196            connProps += ",ValidationQuery=select count(*) from VALIDATE_CONN";
197            connProps = MessageFormat.format(connProps, dbSchema);
198        }
199        else {
200            connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
201        }
202        if (connPropsConfig != null) {
203            connProps += "," + connPropsConfig;
204        }
205        props.setProperty("openjpa.ConnectionProperties", connProps);
206
207        props.setProperty("openjpa.ConnectionDriverName", dataSource);
208        if (!StringUtils.isEmpty(brokerImplConfig)) {
209            props.setProperty("openjpa.BrokerImpl", brokerImplConfig);
210            LOG.info("Setting openjpa.BrokerImpl to {0}", brokerImplConfig);
211        }
212
213        factory = Persistence.createEntityManagerFactory(persistentUnit, props);
214
215        EntityManager entityManager = getEntityManager();
216        entityManager.find(WorkflowActionBean.class, 1);
217        entityManager.find(WorkflowJobBean.class, 1);
218        entityManager.find(CoordinatorActionBean.class, 1);
219        entityManager.find(CoordinatorJobBean.class, 1);
220        entityManager.find(SLAEventBean.class, 1);
221        entityManager.find(JsonSLAEvent.class, 1);
222        entityManager.find(BundleJobBean.class, 1);
223        entityManager.find(BundleActionBean.class, 1);
224        entityManager.find(SLARegistrationBean.class, 1);
225        entityManager.find(SLASummaryBean.class, 1);
226
227        LOG.info(XLog.STD, "All entities initialized");
228        // need to use a pseudo no-op transaction so all entities, datasource
229        // and connection pool are initialized one time only
230        entityManager.getTransaction().begin();
231        OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory;
232        // Mask the password with '***'
233        String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,");
234        LOG.info("JPA configuration: {0}", logMsg);
235        entityManager.getTransaction().commit();
236        entityManager.close();
237        try {
238            CodecFactory.initialize(conf);
239        }
240        catch (Exception ex) {
241            throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex);
242        }
243    }
244
245    /**
246     * Destroy the JPAService
247     */
248    public void destroy() {
249        if (factory != null && factory.isOpen()) {
250            factory.close();
251        }
252    }
253
254    /**
255     * Execute a {@link JPAExecutor}.
256     *
257     * @param executor JPAExecutor to execute.
258     * @return return value of the JPAExecutor.
259     * @throws JPAExecutorException thrown if an jpa executor failed
260     */
261    public <T> T execute(JPAExecutor<T> executor) throws JPAExecutorException {
262        EntityManager em = getEntityManager();
263        Instrumentation.Cron cron = new Instrumentation.Cron();
264        try {
265            LOG.trace("Executing JPAExecutor [{0}]", executor.getName());
266            if (instr != null) {
267                instr.incr(INSTRUMENTATION_GROUP_JPA, executor.getName(), 1);
268            }
269            cron.start();
270            em.getTransaction().begin();
271            T t = executor.execute(em);
272            if (em.getTransaction().isActive()) {
273                if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
274                    throw new RuntimeException("Skipping Commit for Failover Testing");
275                }
276
277                em.getTransaction().commit();
278            }
279            return t;
280        }
281        catch (PersistenceException e) {
282            throw new JPAExecutorException(ErrorCode.E0603, e);
283        }
284        finally {
285            cron.stop();
286            if (instr != null) {
287                instr.addCron(INSTRUMENTATION_GROUP_JPA, executor.getName(), cron);
288            }
289            try {
290                if (em.getTransaction().isActive()) {
291                    LOG.warn("JPAExecutor [{0}] ended with an active transaction, rolling back", executor.getName());
292                    em.getTransaction().rollback();
293                }
294            }
295            catch (Exception ex) {
296                LOG.warn("Could not check/rollback transaction after JPAExecutor [{0}], {1}", executor.getName(), ex
297                        .getMessage(), ex);
298            }
299            try {
300                if (em.isOpen()) {
301                    em.close();
302                }
303                else {
304                    LOG.warn("JPAExecutor [{0}] closed the EntityManager, it should not!", executor.getName());
305                }
306            }
307            catch (Exception ex) {
308                LOG.warn("Could not close EntityManager after JPAExecutor [{0}], {1}", executor.getName(), ex
309                        .getMessage(), ex);
310            }
311        }
312    }
313
314    /**
315     * Execute an UPDATE query
316     * @param namedQueryName the name of query to be executed
317     * @param query query instance to be executed
318     * @param em Entity Manager
319     * @return Integer that query returns, which corresponds to the number of rows updated
320     * @throws JPAExecutorException
321     */
322    public int executeUpdate(String namedQueryName, Query query, EntityManager em) throws JPAExecutorException {
323        Instrumentation.Cron cron = new Instrumentation.Cron();
324        try {
325
326            LOG.trace("Executing Update/Delete Query [{0}]", namedQueryName);
327            if (instr != null) {
328                instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1);
329            }
330            cron.start();
331            em.getTransaction().begin();
332            int ret = query.executeUpdate();
333            if (em.getTransaction().isActive()) {
334                if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
335                    throw new RuntimeException("Skipping Commit for Failover Testing");
336                }
337                em.getTransaction().commit();
338            }
339            return ret;
340        }
341        catch (PersistenceException e) {
342            throw new JPAExecutorException(ErrorCode.E0603, e);
343        }
344        finally {
345            processFinally(em, cron, namedQueryName, true);
346        }
347    }
348
349    public static class QueryEntry<E extends Enum<E>> {
350        E namedQuery;
351        Query query;
352
353        public QueryEntry(E namedQuery, Query query) {
354            this.namedQuery = namedQuery;
355            this.query = query;
356        }
357
358        public Query getQuery() {
359            return this.query;
360        }
361
362        public E getQueryName() {
363            return this.namedQuery;
364        }
365    }
366
367    private void processFinally(EntityManager em, Instrumentation.Cron cron, String name, boolean checkActive) {
368        cron.stop();
369        if (instr != null) {
370            instr.addCron(INSTRUMENTATION_GROUP_JPA, name, cron);
371        }
372        if (checkActive) {
373            try {
374                if (em.getTransaction().isActive()) {
375                    LOG.warn("[{0}] ended with an active transaction, rolling back", name);
376                    em.getTransaction().rollback();
377                }
378            }
379            catch (Exception ex) {
380                LOG.warn("Could not check/rollback transaction after [{0}], {1}", name,
381                        ex.getMessage(), ex);
382            }
383        }
384        try {
385            if (em.isOpen()) {
386                em.close();
387            }
388            else {
389                LOG.warn("[{0}] closed the EntityManager, it should not!", name);
390            }
391        }
392        catch (Exception ex) {
393            LOG.warn("Could not close EntityManager after [{0}], {1}", name, ex.getMessage(), ex);
394        }
395    }
396
397    /**
398     * Execute multiple update/insert queries in one transaction
399     * @param insertBeans list of beans to be inserted
400     * @param updateQueryList list of update queries
401     * @param deleteBeans list of beans to be deleted
402     * @param em Entity Manager
403     * @throws JPAExecutorException
404     */
405    public void executeBatchInsertUpdateDelete(Collection<JsonBean> insertBeans, List<QueryEntry> updateQueryList,
406            Collection<JsonBean> deleteBeans, EntityManager em) throws JPAExecutorException {
407        Instrumentation.Cron cron = new Instrumentation.Cron();
408        try {
409
410            LOG.trace("Executing Queries in Batch");
411            cron.start();
412            em.getTransaction().begin();
413            if (updateQueryList != null && updateQueryList.size() > 0) {
414                for (QueryEntry q : updateQueryList) {
415                    if (instr != null) {
416                        instr.incr(INSTRUMENTATION_GROUP_JPA, q.getQueryName().name(), 1);
417                    }
418                    q.getQuery().executeUpdate();
419                }
420            }
421            if (insertBeans != null && insertBeans.size() > 0) {
422                for (JsonBean bean : insertBeans) {
423                    em.persist(bean);
424                }
425            }
426            if (deleteBeans != null && deleteBeans.size() > 0) {
427                for (JsonBean bean : deleteBeans) {
428                    em.remove(em.merge(bean));
429                }
430            }
431            if (em.getTransaction().isActive()) {
432                if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
433                    throw new RuntimeException("Skipping Commit for Failover Testing");
434                }
435                em.getTransaction().commit();
436            }
437        }
438        catch (PersistenceException e) {
439            throw new JPAExecutorException(ErrorCode.E0603, e);
440        }
441        finally {
442            processFinally(em, cron, "batchqueryexecutor", true);
443        }
444    }
445
446    /**
447     * Execute a SELECT query
448     * @param namedQueryName the name of query to be executed
449     * @param query query instance to be executed
450     * @param em Entity Manager
451     * @return object that matches the query
452     */
453    public Object executeGet(String namedQueryName, Query query, EntityManager em) {
454        Instrumentation.Cron cron = new Instrumentation.Cron();
455        try {
456
457            LOG.trace("Executing Select Query to Get a Single row  [{0}]", namedQueryName);
458            if (instr != null) {
459                instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1);
460            }
461
462            cron.start();
463            Object obj = null;
464            try {
465                obj = query.getSingleResult();
466            }
467            catch (NoResultException e) {
468                // return null when no matched result
469            }
470            return obj;
471        }
472        finally {
473            processFinally(em, cron, namedQueryName, false);
474        }
475    }
476
477    /**
478     * Execute a SELECT query to get list of results
479     * @param namedQueryName the name of query to be executed
480     * @param query query instance to be executed
481     * @param em Entity Manager
482     * @return list containing results that match the query
483     */
484    public List<?> executeGetList(String namedQueryName, Query query, EntityManager em) {
485        Instrumentation.Cron cron = new Instrumentation.Cron();
486        try {
487
488            LOG.trace("Executing Select Query to Get Multiple Rows [{0}]", namedQueryName);
489            if (instr != null) {
490                instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1);
491            }
492
493            cron.start();
494            List<?> resultList = null;
495            try {
496                resultList = query.getResultList();
497            }
498            catch (NoResultException e) {
499                // return null when no matched result
500            }
501            return resultList;
502        }
503        finally {
504            processFinally(em, cron, namedQueryName, false);
505        }
506    }
507
508    /**
509     * Return an EntityManager. Used by the StoreService. Once the StoreService is removed this method must be removed.
510     *
511     * @return an entity manager
512     */
513    public EntityManager getEntityManager() {
514        return factory.createEntityManager();
515    }
516
517}