001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.io.IOException;
021import java.text.MessageFormat;
022import java.util.Collection;
023import java.util.List;
024import java.util.Properties;
025
026import javax.persistence.EntityManager;
027import javax.persistence.EntityManagerFactory;
028import javax.persistence.NoResultException;
029import javax.persistence.Persistence;
030import javax.persistence.PersistenceException;
031import javax.persistence.Query;
032
033import org.apache.commons.dbcp.BasicDataSource;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.oozie.BundleActionBean;
036import org.apache.oozie.BundleJobBean;
037import org.apache.oozie.CoordinatorActionBean;
038import org.apache.oozie.CoordinatorJobBean;
039import org.apache.oozie.ErrorCode;
040import org.apache.oozie.FaultInjection;
041import org.apache.oozie.SLAEventBean;
042import org.apache.oozie.WorkflowActionBean;
043import org.apache.oozie.WorkflowJobBean;
044import org.apache.oozie.client.rest.JsonBean;
045import org.apache.oozie.client.rest.JsonSLAEvent;
046import org.apache.oozie.compression.CodecFactory;
047import org.apache.oozie.executor.jpa.JPAExecutor;
048import org.apache.oozie.executor.jpa.JPAExecutorException;
049import org.apache.oozie.sla.SLARegistrationBean;
050import org.apache.oozie.sla.SLASummaryBean;
051import org.apache.oozie.util.IOUtils;
052import org.apache.oozie.util.Instrumentable;
053import org.apache.oozie.util.Instrumentation;
054import org.apache.oozie.util.XLog;
055import org.apache.openjpa.lib.jdbc.DecoratingDataSource;
056import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
057
058/**
059 * Service that manages JPA and executes {@link JPAExecutor}.
060 */
061@SuppressWarnings("deprecation")
062public class JPAService implements Service, Instrumentable {
063    private static final String INSTRUMENTATION_GROUP_JPA = "jpa";
064
065    public static final String CONF_DB_SCHEMA = "oozie.db.schema.name";
066
067    public static final String CONF_PREFIX = Service.CONF_PREFIX + "JPAService.";
068    public static final String CONF_URL = CONF_PREFIX + "jdbc.url";
069    public static final String CONF_DRIVER = CONF_PREFIX + "jdbc.driver";
070    public static final String CONF_USERNAME = CONF_PREFIX + "jdbc.username";
071    public static final String CONF_PASSWORD = CONF_PREFIX + "jdbc.password";
072    public static final String CONF_CONN_DATA_SOURCE = CONF_PREFIX + "connection.data.source";
073    public static final String CONF_CONN_PROPERTIES = CONF_PREFIX + "connection.properties";
074    public static final String CONF_MAX_ACTIVE_CONN = CONF_PREFIX + "pool.max.active.conn";
075    public static final String CONF_CREATE_DB_SCHEMA = CONF_PREFIX + "create.db.schema";
076    public static final String CONF_VALIDATE_DB_CONN = CONF_PREFIX + "validate.db.connection";
077    public static final String CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL = CONF_PREFIX + "validate.db.connection.eviction.interval";
078    public static final String CONF_VALIDATE_DB_CONN_EVICTION_NUM = CONF_PREFIX + "validate.db.connection.eviction.num";
079
080
081    private EntityManagerFactory factory;
082    private Instrumentation instr;
083
084    private static XLog LOG;
085
086    /**
087     * Return the public interface of the service.
088     *
089     * @return {@link JPAService}.
090     */
091    public Class<? extends Service> getInterface() {
092        return JPAService.class;
093    }
094
095    @Override
096    public void instrument(Instrumentation instr) {
097        this.instr = instr;
098
099        final BasicDataSource dataSource = getBasicDataSource();
100        if (dataSource != null) {
101            instr.addSampler("jdbc", "connections.active", 60, 1, new Instrumentation.Variable<Long>() {
102                @Override
103                public Long getValue() {
104                    return (long) dataSource.getNumActive();
105                }
106            });
107            instr.addSampler("jdbc", "connections.idle", 60, 1, new Instrumentation.Variable<Long>() {
108                @Override
109                public Long getValue() {
110                    return (long) dataSource.getNumIdle();
111                }
112            });
113        }
114    }
115
116    private BasicDataSource getBasicDataSource() {
117        // Get the BasicDataSource object; it could be wrapped in a DecoratingDataSource
118        // It might also not be a BasicDataSource if the user configured something different
119        BasicDataSource basicDataSource = null;
120        OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory;
121        Object connectionFactory = spi.getConfiguration().getConnectionFactory();
122        if (connectionFactory instanceof DecoratingDataSource) {
123            DecoratingDataSource decoratingDataSource = (DecoratingDataSource) connectionFactory;
124            basicDataSource = (BasicDataSource) decoratingDataSource.getInnermostDelegate();
125        } else if (connectionFactory instanceof BasicDataSource) {
126            basicDataSource = (BasicDataSource) connectionFactory;
127        }
128        return basicDataSource;
129    }
130
131    /**
132     * Initializes the {@link JPAService}.
133     *
134     * @param services services instance.
135     */
136    public void init(Services services) throws ServiceException {
137        LOG = XLog.getLog(JPAService.class);
138        Configuration conf = services.getConf();
139        String dbSchema = conf.get(CONF_DB_SCHEMA, "oozie");
140        String url = conf.get(CONF_URL, "jdbc:derby:${oozie.home.dir}/${oozie.db.schema.name}-db;create=true");
141        String driver = conf.get(CONF_DRIVER, "org.apache.derby.jdbc.EmbeddedDriver");
142        String user = conf.get(CONF_USERNAME, "sa");
143        String password = conf.get(CONF_PASSWORD, "").trim();
144        String maxConn = conf.get(CONF_MAX_ACTIVE_CONN, "10").trim();
145        String dataSource = conf.get(CONF_CONN_DATA_SOURCE, "org.apache.commons.dbcp.BasicDataSource");
146        String connPropsConfig = conf.get(CONF_CONN_PROPERTIES);
147        boolean autoSchemaCreation = conf.getBoolean(CONF_CREATE_DB_SCHEMA, false);
148        boolean validateDbConn = conf.getBoolean(CONF_VALIDATE_DB_CONN, true);
149        String evictionInterval = conf.get(CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL, "300000").trim();
150        String evictionNum = conf.get(CONF_VALIDATE_DB_CONN_EVICTION_NUM, "10").trim();
151
152        if (!url.startsWith("jdbc:")) {
153            throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, must start with 'jdbc:'");
154        }
155        String dbType = url.substring("jdbc:".length());
156        if (dbType.indexOf(":") <= 0) {
157            throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'");
158        }
159        dbType = dbType.substring(0, dbType.indexOf(":"));
160
161        String persistentUnit = "oozie-" + dbType;
162
163        // Checking existince of ORM file for DB type
164        String ormFile = "META-INF/" + persistentUnit + "-orm.xml";
165        try {
166            IOUtils.getResourceAsStream(ormFile, -1);
167        }
168        catch (IOException ex) {
169            throw new ServiceException(ErrorCode.E0609, dbType, ormFile);
170        }
171
172        String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}";
173        connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn);
174        Properties props = new Properties();
175        if (autoSchemaCreation) {
176            connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
177            props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)");
178        }
179        else if (validateDbConn) {
180            // validation can be done only if the schema already exist, else a
181            // connection cannot be obtained to create the schema.
182            String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval;
183            String num = "numTestsPerEvictionRun=" + evictionNum;
184            connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num;
185            connProps += ",ValidationQuery=select count(*) from VALIDATE_CONN";
186            connProps = MessageFormat.format(connProps, dbSchema);
187        }
188        else {
189            connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
190        }
191        if (connPropsConfig != null) {
192            connProps += "," + connPropsConfig;
193        }
194        props.setProperty("openjpa.ConnectionProperties", connProps);
195
196        props.setProperty("openjpa.ConnectionDriverName", dataSource);
197
198        factory = Persistence.createEntityManagerFactory(persistentUnit, props);
199
200        EntityManager entityManager = getEntityManager();
201        entityManager.find(WorkflowActionBean.class, 1);
202        entityManager.find(WorkflowJobBean.class, 1);
203        entityManager.find(CoordinatorActionBean.class, 1);
204        entityManager.find(CoordinatorJobBean.class, 1);
205        entityManager.find(SLAEventBean.class, 1);
206        entityManager.find(JsonSLAEvent.class, 1);
207        entityManager.find(BundleJobBean.class, 1);
208        entityManager.find(BundleActionBean.class, 1);
209        entityManager.find(SLARegistrationBean.class, 1);
210        entityManager.find(SLASummaryBean.class, 1);
211
212        LOG.info(XLog.STD, "All entities initialized");
213        // need to use a pseudo no-op transaction so all entities, datasource
214        // and connection pool are initialized one time only
215        entityManager.getTransaction().begin();
216        OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory;
217        // Mask the password with '***'
218        String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,");
219        LOG.info("JPA configuration: {0}", logMsg);
220        entityManager.getTransaction().commit();
221        entityManager.close();
222        try {
223            CodecFactory.initialize(conf);
224        }
225        catch (Exception ex) {
226            throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex);
227        }
228    }
229
230    /**
231     * Destroy the JPAService
232     */
233    public void destroy() {
234        if (factory != null && factory.isOpen()) {
235            factory.close();
236        }
237    }
238
239    /**
240     * Execute a {@link JPAExecutor}.
241     *
242     * @param executor JPAExecutor to execute.
243     * @return return value of the JPAExecutor.
244     * @throws JPAExecutorException thrown if an jpa executor failed
245     */
246    public <T> T execute(JPAExecutor<T> executor) throws JPAExecutorException {
247        EntityManager em = getEntityManager();
248        Instrumentation.Cron cron = new Instrumentation.Cron();
249        try {
250            LOG.trace("Executing JPAExecutor [{0}]", executor.getName());
251            if (instr != null) {
252                instr.incr(INSTRUMENTATION_GROUP_JPA, executor.getName(), 1);
253            }
254            cron.start();
255            em.getTransaction().begin();
256            T t = executor.execute(em);
257            if (em.getTransaction().isActive()) {
258                if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
259                    throw new RuntimeException("Skipping Commit for Failover Testing");
260                }
261
262                em.getTransaction().commit();
263            }
264            return t;
265        }
266        catch (PersistenceException e) {
267            throw new JPAExecutorException(ErrorCode.E0603, e);
268        }
269        finally {
270            cron.stop();
271            if (instr != null) {
272                instr.addCron(INSTRUMENTATION_GROUP_JPA, executor.getName(), cron);
273            }
274            try {
275                if (em.getTransaction().isActive()) {
276                    LOG.warn("JPAExecutor [{0}] ended with an active transaction, rolling back", executor.getName());
277                    em.getTransaction().rollback();
278                }
279            }
280            catch (Exception ex) {
281                LOG.warn("Could not check/rollback transaction after JPAExecutor [{0}], {1}", executor.getName(), ex
282                        .getMessage(), ex);
283            }
284            try {
285                if (em.isOpen()) {
286                    em.close();
287                }
288                else {
289                    LOG.warn("JPAExecutor [{0}] closed the EntityManager, it should not!", executor.getName());
290                }
291            }
292            catch (Exception ex) {
293                LOG.warn("Could not close EntityManager after JPAExecutor [{0}], {1}", executor.getName(), ex
294                        .getMessage(), ex);
295            }
296        }
297    }
298
299    /**
300     * Execute an UPDATE query
301     * @param namedQueryName the name of query to be executed
302     * @param query query instance to be executed
303     * @param em Entity Manager
304     * @return Integer that query returns, which corresponds to the number of rows updated
305     * @throws JPAExecutorException
306     */
307    public int executeUpdate(String namedQueryName, Query query, EntityManager em) throws JPAExecutorException {
308        Instrumentation.Cron cron = new Instrumentation.Cron();
309        try {
310
311            LOG.trace("Executing Update/Delete Query [{0}]", namedQueryName);
312            if (instr != null) {
313                instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1);
314            }
315            cron.start();
316            em.getTransaction().begin();
317            int ret = query.executeUpdate();
318            if (em.getTransaction().isActive()) {
319                if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
320                    throw new RuntimeException("Skipping Commit for Failover Testing");
321                }
322                em.getTransaction().commit();
323            }
324            return ret;
325        }
326        catch (PersistenceException e) {
327            throw new JPAExecutorException(ErrorCode.E0603, e);
328        }
329        finally {
330            processFinally(em, cron, namedQueryName, true);
331        }
332    }
333
334    public static class QueryEntry<E extends Enum<E>> {
335        E namedQuery;
336        Query query;
337
338        public QueryEntry(E namedQuery, Query query) {
339            this.namedQuery = namedQuery;
340            this.query = query;
341        }
342
343        public Query getQuery() {
344            return this.query;
345        }
346
347        public E getQueryName() {
348            return this.namedQuery;
349        }
350    }
351
352    private void processFinally(EntityManager em, Instrumentation.Cron cron, String name, boolean checkActive) {
353        cron.stop();
354        if (instr != null) {
355            instr.addCron(INSTRUMENTATION_GROUP_JPA, name, cron);
356        }
357        if (checkActive) {
358            try {
359                if (em.getTransaction().isActive()) {
360                    LOG.warn("[{0}] ended with an active transaction, rolling back", name);
361                    em.getTransaction().rollback();
362                }
363            }
364            catch (Exception ex) {
365                LOG.warn("Could not check/rollback transaction after [{0}], {1}", name,
366                        ex.getMessage(), ex);
367            }
368        }
369        try {
370            if (em.isOpen()) {
371                em.close();
372            }
373            else {
374                LOG.warn("[{0}] closed the EntityManager, it should not!", name);
375            }
376        }
377        catch (Exception ex) {
378            LOG.warn("Could not close EntityManager after [{0}], {1}", name, ex.getMessage(), ex);
379        }
380    }
381
382    /**
383     * Execute multiple update/insert queries in one transaction
384     * @param insertBeans list of beans to be inserted
385     * @param updateQueryList list of update queries
386     * @param deleteBeans list of beans to be deleted
387     * @param em Entity Manager
388     * @throws JPAExecutorException
389     */
390    public void executeBatchInsertUpdateDelete(Collection<JsonBean> insertBeans, List<QueryEntry> updateQueryList,
391            Collection<JsonBean> deleteBeans, EntityManager em) throws JPAExecutorException {
392        Instrumentation.Cron cron = new Instrumentation.Cron();
393        try {
394
395            LOG.trace("Executing Queries in Batch");
396            cron.start();
397            em.getTransaction().begin();
398            if (updateQueryList != null && updateQueryList.size() > 0) {
399                for (QueryEntry q : updateQueryList) {
400                    if (instr != null) {
401                        instr.incr(INSTRUMENTATION_GROUP_JPA, q.getQueryName().name(), 1);
402                    }
403                    q.getQuery().executeUpdate();
404                }
405            }
406            if (insertBeans != null && insertBeans.size() > 0) {
407                for (JsonBean bean : insertBeans) {
408                    em.persist(bean);
409                }
410            }
411            if (deleteBeans != null && deleteBeans.size() > 0) {
412                for (JsonBean bean : deleteBeans) {
413                    em.remove(em.merge(bean));
414                }
415            }
416            if (em.getTransaction().isActive()) {
417                if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
418                    throw new RuntimeException("Skipping Commit for Failover Testing");
419                }
420                em.getTransaction().commit();
421            }
422        }
423        catch (PersistenceException e) {
424            throw new JPAExecutorException(ErrorCode.E0603, e);
425        }
426        finally {
427            processFinally(em, cron, "batchqueryexecutor", true);
428        }
429    }
430
431    /**
432     * Execute a SELECT query
433     * @param namedQueryName the name of query to be executed
434     * @param query query instance to be executed
435     * @param em Entity Manager
436     * @return object that matches the query
437     */
438    public Object executeGet(String namedQueryName, Query query, EntityManager em) {
439        Instrumentation.Cron cron = new Instrumentation.Cron();
440        try {
441
442            LOG.trace("Executing Select Query to Get a Single row  [{0}]", namedQueryName);
443            if (instr != null) {
444                instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1);
445            }
446
447            cron.start();
448            Object obj = null;
449            try {
450                obj = query.getSingleResult();
451            }
452            catch (NoResultException e) {
453                // return null when no matched result
454            }
455            return obj;
456        }
457        finally {
458            processFinally(em, cron, namedQueryName, false);
459        }
460    }
461
462    /**
463     * Execute a SELECT query to get list of results
464     * @param namedQueryName the name of query to be executed
465     * @param query query instance to be executed
466     * @param em Entity Manager
467     * @return list containing results that match the query
468     */
469    public List<?> executeGetList(String namedQueryName, Query query, EntityManager em) {
470        Instrumentation.Cron cron = new Instrumentation.Cron();
471        try {
472
473            LOG.trace("Executing Select Query to Get Multiple Rows [{0}]", namedQueryName);
474            if (instr != null) {
475                instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1);
476            }
477
478            cron.start();
479            List<?> resultList = null;
480            try {
481                resultList = query.getResultList();
482            }
483            catch (NoResultException e) {
484                // return null when no matched result
485            }
486            return resultList;
487        }
488        finally {
489            processFinally(em, cron, namedQueryName, false);
490        }
491    }
492
493    /**
494     * Return an EntityManager. Used by the StoreService. Once the StoreService is removed this method must be removed.
495     *
496     * @return an entity manager
497     */
498    public EntityManager getEntityManager() {
499        return factory.createEntityManager();
500    }
501
502}