This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.io.IOException;
022import java.text.MessageFormat;
023import java.util.Collection;
024import java.util.List;
025import java.util.Properties;
026
027import javax.persistence.EntityManager;
028import javax.persistence.EntityManagerFactory;
029import javax.persistence.NoResultException;
030import javax.persistence.Persistence;
031import javax.persistence.PersistenceException;
032import javax.persistence.Query;
033
034import org.apache.commons.dbcp.BasicDataSource;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.oozie.BundleActionBean;
037import org.apache.oozie.BundleJobBean;
038import org.apache.oozie.CoordinatorActionBean;
039import org.apache.oozie.CoordinatorJobBean;
040import org.apache.oozie.ErrorCode;
041import org.apache.oozie.FaultInjection;
042import org.apache.oozie.SLAEventBean;
043import org.apache.oozie.WorkflowActionBean;
044import org.apache.oozie.WorkflowJobBean;
045import org.apache.oozie.client.rest.JsonBean;
046import org.apache.oozie.client.rest.JsonSLAEvent;
047import org.apache.oozie.compression.CodecFactory;
048import org.apache.oozie.executor.jpa.JPAExecutor;
049import org.apache.oozie.executor.jpa.JPAExecutorException;
050import org.apache.oozie.sla.SLARegistrationBean;
051import org.apache.oozie.sla.SLASummaryBean;
052import org.apache.oozie.util.IOUtils;
053import org.apache.oozie.util.Instrumentable;
054import org.apache.oozie.util.Instrumentation;
055import org.apache.oozie.util.XLog;
056import org.apache.openjpa.lib.jdbc.DecoratingDataSource;
057import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
058
059/**
060 * Service that manages JPA and executes {@link JPAExecutor}.
061 */
062@SuppressWarnings("deprecation")
063public class JPAService implements Service, Instrumentable {
064    private static final String INSTRUMENTATION_GROUP_JPA = "jpa";
065
066    public static final String CONF_DB_SCHEMA = "oozie.db.schema.name";
067
068    public static final String CONF_PREFIX = Service.CONF_PREFIX + "JPAService.";
069    public static final String CONF_URL = CONF_PREFIX + "jdbc.url";
070    public static final String CONF_DRIVER = CONF_PREFIX + "jdbc.driver";
071    public static final String CONF_USERNAME = CONF_PREFIX + "jdbc.username";
072    public static final String CONF_PASSWORD = CONF_PREFIX + "jdbc.password";
073    public static final String CONF_CONN_DATA_SOURCE = CONF_PREFIX + "connection.data.source";
074    public static final String CONF_CONN_PROPERTIES = CONF_PREFIX + "connection.properties";
075    public static final String CONF_MAX_ACTIVE_CONN = CONF_PREFIX + "pool.max.active.conn";
076    public static final String CONF_CREATE_DB_SCHEMA = CONF_PREFIX + "create.db.schema";
077    public static final String CONF_VALIDATE_DB_CONN = CONF_PREFIX + "validate.db.connection";
078    public static final String CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL = CONF_PREFIX + "validate.db.connection.eviction.interval";
079    public static final String CONF_VALIDATE_DB_CONN_EVICTION_NUM = CONF_PREFIX + "validate.db.connection.eviction.num";
080
081
082    private EntityManagerFactory factory;
083    private Instrumentation instr;
084
085    private static XLog LOG;
086
087    /**
088     * Return the public interface of the service.
089     *
090     * @return {@link JPAService}.
091     */
092    public Class<? extends Service> getInterface() {
093        return JPAService.class;
094    }
095
096    @Override
097    public void instrument(Instrumentation instr) {
098        this.instr = instr;
099
100        final BasicDataSource dataSource = getBasicDataSource();
101        if (dataSource != null) {
102            instr.addSampler("jdbc", "connections.active", 60, 1, new Instrumentation.Variable<Long>() {
103                @Override
104                public Long getValue() {
105                    return (long) dataSource.getNumActive();
106                }
107            });
108            instr.addSampler("jdbc", "connections.idle", 60, 1, new Instrumentation.Variable<Long>() {
109                @Override
110                public Long getValue() {
111                    return (long) dataSource.getNumIdle();
112                }
113            });
114        }
115    }
116
117    private BasicDataSource getBasicDataSource() {
118        // Get the BasicDataSource object; it could be wrapped in a DecoratingDataSource
119        // It might also not be a BasicDataSource if the user configured something different
120        BasicDataSource basicDataSource = null;
121        OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory;
122        Object connectionFactory = spi.getConfiguration().getConnectionFactory();
123        if (connectionFactory instanceof DecoratingDataSource) {
124            DecoratingDataSource decoratingDataSource = (DecoratingDataSource) connectionFactory;
125            basicDataSource = (BasicDataSource) decoratingDataSource.getInnermostDelegate();
126        } else if (connectionFactory instanceof BasicDataSource) {
127            basicDataSource = (BasicDataSource) connectionFactory;
128        }
129        return basicDataSource;
130    }
131
132    /**
133     * Initializes the {@link JPAService}.
134     *
135     * @param services services instance.
136     */
137    public void init(Services services) throws ServiceException {
138        LOG = XLog.getLog(JPAService.class);
139        Configuration conf = services.getConf();
140        String dbSchema = ConfigurationService.get(conf, CONF_DB_SCHEMA);
141        String url = ConfigurationService.get(conf, CONF_URL);
142        String driver = ConfigurationService.get(conf, CONF_DRIVER);
143        String user = ConfigurationService.get(conf, CONF_USERNAME);
144        String password = ConfigurationService.get(conf, CONF_PASSWORD).trim();
145        String maxConn = ConfigurationService.get(conf, CONF_MAX_ACTIVE_CONN).trim();
146        String dataSource = ConfigurationService.get(conf, CONF_CONN_DATA_SOURCE);
147        String connPropsConfig = ConfigurationService.get(conf, CONF_CONN_PROPERTIES);
148        boolean autoSchemaCreation = ConfigurationService.getBoolean(conf, CONF_CREATE_DB_SCHEMA);
149        boolean validateDbConn = ConfigurationService.getBoolean(conf, CONF_VALIDATE_DB_CONN);
150        String evictionInterval = ConfigurationService.get(conf, CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL).trim();
151        String evictionNum = ConfigurationService.get(conf, CONF_VALIDATE_DB_CONN_EVICTION_NUM).trim();
152
153        if (!url.startsWith("jdbc:")) {
154            throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, must start with 'jdbc:'");
155        }
156        String dbType = url.substring("jdbc:".length());
157        if (dbType.indexOf(":") <= 0) {
158            throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'");
159        }
160        dbType = dbType.substring(0, dbType.indexOf(":"));
161
162        String persistentUnit = "oozie-" + dbType;
163
164        // Checking existince of ORM file for DB type
165        String ormFile = "META-INF/" + persistentUnit + "-orm.xml";
166        try {
167            IOUtils.getResourceAsStream(ormFile, -1);
168        }
169        catch (IOException ex) {
170            throw new ServiceException(ErrorCode.E0609, dbType, ormFile);
171        }
172
173        String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}";
174        connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn);
175        Properties props = new Properties();
176        if (autoSchemaCreation) {
177            connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
178            props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)");
179        }
180        else if (validateDbConn) {
181            // validation can be done only if the schema already exist, else a
182            // connection cannot be obtained to create the schema.
183            String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval;
184            String num = "numTestsPerEvictionRun=" + evictionNum;
185            connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num;
186            connProps += ",ValidationQuery=select count(*) from VALIDATE_CONN";
187            connProps = MessageFormat.format(connProps, dbSchema);
188        }
189        else {
190            connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
191        }
192        if (connPropsConfig != null) {
193            connProps += "," + connPropsConfig;
194        }
195        props.setProperty("openjpa.ConnectionProperties", connProps);
196
197        props.setProperty("openjpa.ConnectionDriverName", dataSource);
198
199        factory = Persistence.createEntityManagerFactory(persistentUnit, props);
200
201        EntityManager entityManager = getEntityManager();
202        entityManager.find(WorkflowActionBean.class, 1);
203        entityManager.find(WorkflowJobBean.class, 1);
204        entityManager.find(CoordinatorActionBean.class, 1);
205        entityManager.find(CoordinatorJobBean.class, 1);
206        entityManager.find(SLAEventBean.class, 1);
207        entityManager.find(JsonSLAEvent.class, 1);
208        entityManager.find(BundleJobBean.class, 1);
209        entityManager.find(BundleActionBean.class, 1);
210        entityManager.find(SLARegistrationBean.class, 1);
211        entityManager.find(SLASummaryBean.class, 1);
212
213        LOG.info(XLog.STD, "All entities initialized");
214        // need to use a pseudo no-op transaction so all entities, datasource
215        // and connection pool are initialized one time only
216        entityManager.getTransaction().begin();
217        OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory;
218        // Mask the password with '***'
219        String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,");
220        LOG.info("JPA configuration: {0}", logMsg);
221        entityManager.getTransaction().commit();
222        entityManager.close();
223        try {
224            CodecFactory.initialize(conf);
225        }
226        catch (Exception ex) {
227            throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex);
228        }
229    }
230
231    /**
232     * Destroy the JPAService
233     */
234    public void destroy() {
235        if (factory != null && factory.isOpen()) {
236            factory.close();
237        }
238    }
239
240    /**
241     * Execute a {@link JPAExecutor}.
242     *
243     * @param executor JPAExecutor to execute.
244     * @return return value of the JPAExecutor.
245     * @throws JPAExecutorException thrown if an jpa executor failed
246     */
247    public <T> T execute(JPAExecutor<T> executor) throws JPAExecutorException {
248        EntityManager em = getEntityManager();
249        Instrumentation.Cron cron = new Instrumentation.Cron();
250        try {
251            LOG.trace("Executing JPAExecutor [{0}]", executor.getName());
252            if (instr != null) {
253                instr.incr(INSTRUMENTATION_GROUP_JPA, executor.getName(), 1);
254            }
255            cron.start();
256            em.getTransaction().begin();
257            T t = executor.execute(em);
258            if (em.getTransaction().isActive()) {
259                if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
260                    throw new RuntimeException("Skipping Commit for Failover Testing");
261                }
262
263                em.getTransaction().commit();
264            }
265            return t;
266        }
267        catch (PersistenceException e) {
268            throw new JPAExecutorException(ErrorCode.E0603, e);
269        }
270        finally {
271            cron.stop();
272            if (instr != null) {
273                instr.addCron(INSTRUMENTATION_GROUP_JPA, executor.getName(), cron);
274            }
275            try {
276                if (em.getTransaction().isActive()) {
277                    LOG.warn("JPAExecutor [{0}] ended with an active transaction, rolling back", executor.getName());
278                    em.getTransaction().rollback();
279                }
280            }
281            catch (Exception ex) {
282                LOG.warn("Could not check/rollback transaction after JPAExecutor [{0}], {1}", executor.getName(), ex
283                        .getMessage(), ex);
284            }
285            try {
286                if (em.isOpen()) {
287                    em.close();
288                }
289                else {
290                    LOG.warn("JPAExecutor [{0}] closed the EntityManager, it should not!", executor.getName());
291                }
292            }
293            catch (Exception ex) {
294                LOG.warn("Could not close EntityManager after JPAExecutor [{0}], {1}", executor.getName(), ex
295                        .getMessage(), ex);
296            }
297        }
298    }
299
300    /**
301     * Execute an UPDATE query
302     * @param namedQueryName the name of query to be executed
303     * @param query query instance to be executed
304     * @param em Entity Manager
305     * @return Integer that query returns, which corresponds to the number of rows updated
306     * @throws JPAExecutorException
307     */
308    public int executeUpdate(String namedQueryName, Query query, EntityManager em) throws JPAExecutorException {
309        Instrumentation.Cron cron = new Instrumentation.Cron();
310        try {
311
312            LOG.trace("Executing Update/Delete Query [{0}]", namedQueryName);
313            if (instr != null) {
314                instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1);
315            }
316            cron.start();
317            em.getTransaction().begin();
318            int ret = query.executeUpdate();
319            if (em.getTransaction().isActive()) {
320                if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
321                    throw new RuntimeException("Skipping Commit for Failover Testing");
322                }
323                em.getTransaction().commit();
324            }
325            return ret;
326        }
327        catch (PersistenceException e) {
328            throw new JPAExecutorException(ErrorCode.E0603, e);
329        }
330        finally {
331            processFinally(em, cron, namedQueryName, true);
332        }
333    }
334
335    public static class QueryEntry<E extends Enum<E>> {
336        E namedQuery;
337        Query query;
338
339        public QueryEntry(E namedQuery, Query query) {
340            this.namedQuery = namedQuery;
341            this.query = query;
342        }
343
344        public Query getQuery() {
345            return this.query;
346        }
347
348        public E getQueryName() {
349            return this.namedQuery;
350        }
351    }
352
353    private void processFinally(EntityManager em, Instrumentation.Cron cron, String name, boolean checkActive) {
354        cron.stop();
355        if (instr != null) {
356            instr.addCron(INSTRUMENTATION_GROUP_JPA, name, cron);
357        }
358        if (checkActive) {
359            try {
360                if (em.getTransaction().isActive()) {
361                    LOG.warn("[{0}] ended with an active transaction, rolling back", name);
362                    em.getTransaction().rollback();
363                }
364            }
365            catch (Exception ex) {
366                LOG.warn("Could not check/rollback transaction after [{0}], {1}", name,
367                        ex.getMessage(), ex);
368            }
369        }
370        try {
371            if (em.isOpen()) {
372                em.close();
373            }
374            else {
375                LOG.warn("[{0}] closed the EntityManager, it should not!", name);
376            }
377        }
378        catch (Exception ex) {
379            LOG.warn("Could not close EntityManager after [{0}], {1}", name, ex.getMessage(), ex);
380        }
381    }
382
383    /**
384     * Execute multiple update/insert queries in one transaction
385     * @param insertBeans list of beans to be inserted
386     * @param updateQueryList list of update queries
387     * @param deleteBeans list of beans to be deleted
388     * @param em Entity Manager
389     * @throws JPAExecutorException
390     */
391    public void executeBatchInsertUpdateDelete(Collection<JsonBean> insertBeans, List<QueryEntry> updateQueryList,
392            Collection<JsonBean> deleteBeans, EntityManager em) throws JPAExecutorException {
393        Instrumentation.Cron cron = new Instrumentation.Cron();
394        try {
395
396            LOG.trace("Executing Queries in Batch");
397            cron.start();
398            em.getTransaction().begin();
399            if (updateQueryList != null && updateQueryList.size() > 0) {
400                for (QueryEntry q : updateQueryList) {
401                    if (instr != null) {
402                        instr.incr(INSTRUMENTATION_GROUP_JPA, q.getQueryName().name(), 1);
403                    }
404                    q.getQuery().executeUpdate();
405                }
406            }
407            if (insertBeans != null && insertBeans.size() > 0) {
408                for (JsonBean bean : insertBeans) {
409                    em.persist(bean);
410                }
411            }
412            if (deleteBeans != null && deleteBeans.size() > 0) {
413                for (JsonBean bean : deleteBeans) {
414                    em.remove(em.merge(bean));
415                }
416            }
417            if (em.getTransaction().isActive()) {
418                if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
419                    throw new RuntimeException("Skipping Commit for Failover Testing");
420                }
421                em.getTransaction().commit();
422            }
423        }
424        catch (PersistenceException e) {
425            throw new JPAExecutorException(ErrorCode.E0603, e);
426        }
427        finally {
428            processFinally(em, cron, "batchqueryexecutor", true);
429        }
430    }
431
432    /**
433     * Execute a SELECT query
434     * @param namedQueryName the name of query to be executed
435     * @param query query instance to be executed
436     * @param em Entity Manager
437     * @return object that matches the query
438     */
439    public Object executeGet(String namedQueryName, Query query, EntityManager em) {
440        Instrumentation.Cron cron = new Instrumentation.Cron();
441        try {
442
443            LOG.trace("Executing Select Query to Get a Single row  [{0}]", namedQueryName);
444            if (instr != null) {
445                instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1);
446            }
447
448            cron.start();
449            Object obj = null;
450            try {
451                obj = query.getSingleResult();
452            }
453            catch (NoResultException e) {
454                // return null when no matched result
455            }
456            return obj;
457        }
458        finally {
459            processFinally(em, cron, namedQueryName, false);
460        }
461    }
462
463    /**
464     * Execute a SELECT query to get list of results
465     * @param namedQueryName the name of query to be executed
466     * @param query query instance to be executed
467     * @param em Entity Manager
468     * @return list containing results that match the query
469     */
470    public List<?> executeGetList(String namedQueryName, Query query, EntityManager em) {
471        Instrumentation.Cron cron = new Instrumentation.Cron();
472        try {
473
474            LOG.trace("Executing Select Query to Get Multiple Rows [{0}]", namedQueryName);
475            if (instr != null) {
476                instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1);
477            }
478
479            cron.start();
480            List<?> resultList = null;
481            try {
482                resultList = query.getResultList();
483            }
484            catch (NoResultException e) {
485                // return null when no matched result
486            }
487            return resultList;
488        }
489        finally {
490            processFinally(em, cron, namedQueryName, false);
491        }
492    }
493
494    /**
495     * Return an EntityManager. Used by the StoreService. Once the StoreService is removed this method must be removed.
496     *
497     * @return an entity manager
498     */
499    public EntityManager getEntityManager() {
500        return factory.createEntityManager();
501    }
502
503}