001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.io.IOException;
022import java.text.MessageFormat;
023import java.util.Collection;
024import java.util.List;
025import java.util.Properties;
026import java.util.concurrent.Callable;
027
028import javax.persistence.EntityManager;
029import javax.persistence.EntityManagerFactory;
030import javax.persistence.EntityTransaction;
031import javax.persistence.NoResultException;
032import javax.persistence.Persistence;
033import javax.persistence.Query;
034
035import org.apache.commons.collections.CollectionUtils;
036import org.apache.commons.dbcp.BasicDataSource;
037import org.apache.commons.lang.StringUtils;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.oozie.BundleActionBean;
040import org.apache.oozie.BundleJobBean;
041import org.apache.oozie.CoordinatorActionBean;
042import org.apache.oozie.CoordinatorJobBean;
043import org.apache.oozie.ErrorCode;
044import org.apache.oozie.FaultInjection;
045import org.apache.oozie.SLAEventBean;
046import org.apache.oozie.WorkflowActionBean;
047import org.apache.oozie.WorkflowJobBean;
048import org.apache.oozie.client.rest.JsonBean;
049import org.apache.oozie.client.rest.JsonSLAEvent;
050import org.apache.oozie.command.SkipCommitFaultInjection;
051import org.apache.oozie.compression.CodecFactory;
052import org.apache.oozie.executor.jpa.JPAExecutor;
053import org.apache.oozie.executor.jpa.JPAExecutorException;
054import org.apache.oozie.sla.SLARegistrationBean;
055import org.apache.oozie.sla.SLASummaryBean;
056import org.apache.oozie.util.IOUtils;
057import org.apache.oozie.util.Instrumentable;
058import org.apache.oozie.util.Instrumentation;
059import org.apache.oozie.util.XLog;
060import org.apache.oozie.util.db.OperationRetryHandler;
061import org.apache.oozie.util.db.PersistenceExceptionSubclassFilterRetryPredicate;
062import org.apache.openjpa.lib.jdbc.DecoratingDataSource;
063import org.apache.openjpa.persistence.InvalidStateException;
064import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
065
066/**
067 * Service that manages JPA and executes {@link JPAExecutor}.
068 */
069@SuppressWarnings("deprecation")
070public class JPAService implements Service, Instrumentable {
071    private static final String INSTRUMENTATION_GROUP_JPA = "jpa";
072
073    public static final long DEFAULT_INITIAL_WAIT_TIME = 100;
074    public static final long DEFAULT_MAX_WAIT_TIME = 30_000;
075    public static final int DEFAULT_MAX_RETRY_COUNT = 1;
076
077    public static final String CONF_DB_SCHEMA = "oozie.db.schema.name";
078
079    public static final String CONF_PREFIX = Service.CONF_PREFIX + "JPAService.";
080    public static final String CONF_URL = CONF_PREFIX + "jdbc.url";
081    public static final String CONF_DRIVER = CONF_PREFIX + "jdbc.driver";
082    public static final String CONF_USERNAME = CONF_PREFIX + "jdbc.username";
083    public static final String CONF_PASSWORD = CONF_PREFIX + "jdbc.password";
084    public static final String CONF_CONN_DATA_SOURCE = CONF_PREFIX + "connection.data.source";
085    public static final String CONF_CONN_PROPERTIES = CONF_PREFIX + "connection.properties";
086    public static final String CONF_MAX_ACTIVE_CONN = CONF_PREFIX + "pool.max.active.conn";
087    public static final String CONF_CREATE_DB_SCHEMA = CONF_PREFIX + "create.db.schema";
088    public static final String CONF_VALIDATE_DB_CONN = CONF_PREFIX + "validate.db.connection";
089    public static final String CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL = CONF_PREFIX + "validate.db.connection.eviction.interval";
090    public static final String CONF_VALIDATE_DB_CONN_EVICTION_NUM = CONF_PREFIX + "validate.db.connection.eviction.num";
091    public static final String CONF_OPENJPA_BROKER_IMPL = CONF_PREFIX + "openjpa.BrokerImpl";
092    public static final String INITIAL_WAIT_TIME = CONF_PREFIX + "retry.initial-wait-time.ms";
093    public static final String MAX_WAIT_TIME = CONF_PREFIX + "maximum-wait-time.ms";
094    public static final String MAX_RETRY_COUNT = CONF_PREFIX + "retry.max-retries";
095    public static final String SKIP_COMMIT_FAULT_INJECTION_CLASS = SkipCommitFaultInjection.class.getName();
096
097    private EntityManagerFactory factory;
098    private Instrumentation instr;
099
100    private static XLog LOG;
101    private OperationRetryHandler retryHandler;
102
103    /**
104     * Return the public interface of the service.
105     *
106     * @return {@link JPAService}.
107     */
108    public Class<? extends Service> getInterface() {
109        return JPAService.class;
110    }
111
112    @Override
113    public void instrument(final Instrumentation instr) {
114        this.instr = instr;
115
116        final BasicDataSource dataSource = getBasicDataSource();
117        if (dataSource != null) {
118            instr.addSampler("jdbc", "connections.active", 60, 1, new Instrumentation.Variable<Long>() {
119                @Override
120                public Long getValue() {
121                    return (long) dataSource.getNumActive();
122                }
123            });
124            instr.addSampler("jdbc", "connections.idle", 60, 1, new Instrumentation.Variable<Long>() {
125                @Override
126                public Long getValue() {
127                    return (long) dataSource.getNumIdle();
128                }
129            });
130        }
131    }
132
133    private BasicDataSource getBasicDataSource() {
134        // Get the BasicDataSource object; it could be wrapped in a DecoratingDataSource
135        // It might also not be a BasicDataSource if the user configured something different
136        BasicDataSource basicDataSource = null;
137        final OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory;
138        final Object connectionFactory = spi.getConfiguration().getConnectionFactory();
139        if (connectionFactory instanceof DecoratingDataSource) {
140            final DecoratingDataSource decoratingDataSource = (DecoratingDataSource) connectionFactory;
141            basicDataSource = (BasicDataSource) decoratingDataSource.getInnermostDelegate();
142        } else if (connectionFactory instanceof BasicDataSource) {
143            basicDataSource = (BasicDataSource) connectionFactory;
144        }
145        return basicDataSource;
146    }
147
148    /**
149     * Initializes the {@link JPAService}.
150     *
151     * @param services services instance.
152     */
153    public void init(final Services services) throws ServiceException {
154        LOG = XLog.getLog(JPAService.class);
155        final Configuration conf = services.getConf();
156        final String dbSchema = ConfigurationService.get(conf, CONF_DB_SCHEMA);
157        String url = ConfigurationService.get(conf, CONF_URL);
158        final String driver = ConfigurationService.get(conf, CONF_DRIVER);
159        final String user = ConfigurationService.get(conf, CONF_USERNAME);
160        final String password = ConfigurationService.getPassword(conf, CONF_PASSWORD).trim();
161        final String maxConn = ConfigurationService.get(conf, CONF_MAX_ACTIVE_CONN).trim();
162        final String dataSource = ConfigurationService.get(conf, CONF_CONN_DATA_SOURCE);
163        final String connPropsConfig = ConfigurationService.get(conf, CONF_CONN_PROPERTIES);
164        final String brokerImplConfig = ConfigurationService.get(conf, CONF_OPENJPA_BROKER_IMPL);
165        final boolean autoSchemaCreation = ConfigurationService.getBoolean(conf, CONF_CREATE_DB_SCHEMA);
166        final boolean validateDbConn = ConfigurationService.getBoolean(conf, CONF_VALIDATE_DB_CONN);
167        final String evictionInterval = ConfigurationService.get(conf, CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL).trim();
168        final String evictionNum = ConfigurationService.get(conf, CONF_VALIDATE_DB_CONN_EVICTION_NUM).trim();
169
170        if (!url.startsWith("jdbc:")) {
171            throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, must start with 'jdbc:'");
172        }
173        String dbType = url.substring("jdbc:".length());
174        if (dbType.indexOf(":") <= 0) {
175            throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'");
176        }
177        dbType = dbType.substring(0, dbType.indexOf(":"));
178
179        final String persistentUnit = "oozie-" + dbType;
180
181        // Checking existince of ORM file for DB type
182        final String ormFile = "META-INF/" + persistentUnit + "-orm.xml";
183        try {
184            IOUtils.getResourceAsStream(ormFile, -1);
185        }
186        catch (final IOException ex) {
187            throw new ServiceException(ErrorCode.E0609, dbType, ormFile);
188        }
189
190        // support for mysql replication urls "jdbc:mysql:replication://master:port,slave:port[,slave:port]/db"
191        if (url.startsWith("jdbc:mysql:replication")) {
192            url = "\"".concat(url).concat("\"");
193            LOG.info("A jdbc replication url is provided. Url: [{0}]", url);
194        }
195
196
197        String connProps = "DriverClassName={0},Url={1},MaxActive={2}";
198        connProps = MessageFormat.format(connProps, driver, url, maxConn);
199        final Properties props = new Properties();
200        if (autoSchemaCreation) {
201            connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
202            props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)");
203        }
204        else if (validateDbConn) {
205            // validation can be done only if the schema already exist, else a
206            // connection cannot be obtained to create the schema.
207            final String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval;
208            final String num = "numTestsPerEvictionRun=" + evictionNum;
209            connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num;
210            connProps += ",ValidationQuery=select count(*) from VALIDATE_CONN";
211            connProps = MessageFormat.format(connProps, dbSchema);
212        }
213        else {
214            connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
215        }
216        if (connPropsConfig != null) {
217            connProps += "," + connPropsConfig;
218        }
219        props.setProperty("openjpa.ConnectionProperties", connProps);
220        props.setProperty("openjpa.ConnectionPassword", password);
221        props.setProperty("openjpa.ConnectionUserName", user);
222        props.setProperty("openjpa.ConnectionDriverName", dataSource);
223        if (!StringUtils.isEmpty(brokerImplConfig)) {
224            props.setProperty("openjpa.BrokerImpl", brokerImplConfig);
225            LOG.info("Setting openjpa.BrokerImpl to {0}", brokerImplConfig);
226        }
227
228        initRetryHandler();
229
230        factory = Persistence.createEntityManagerFactory(persistentUnit, props);
231
232        final EntityManager entityManager = getEntityManager();
233        findRetrying(entityManager, WorkflowActionBean.class, 1);
234        findRetrying(entityManager, WorkflowJobBean.class, 1);
235        findRetrying(entityManager, CoordinatorActionBean.class, 1);
236        findRetrying(entityManager, CoordinatorJobBean.class, 1);
237        findRetrying(entityManager, SLAEventBean.class, 1);
238        findRetrying(entityManager, JsonSLAEvent.class, 1);
239        findRetrying(entityManager, BundleActionBean.class, 1);
240        findRetrying(entityManager, BundleJobBean.class, 1);
241        findRetrying(entityManager, SLARegistrationBean.class, 1);
242        findRetrying(entityManager, SLASummaryBean.class, 1);
243
244        LOG.info(XLog.STD, "All entities initialized");
245        // need to use a pseudo no-op transaction so all entities, datasource
246        // and connection pool are initialized one time only
247        entityManager.getTransaction().begin();
248        final OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory;
249        // Mask the password with '***'
250        final String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,");
251        LOG.info("JPA configuration: {0}", logMsg);
252        entityManager.getTransaction().commit();
253        entityManager.close();
254        try {
255            CodecFactory.initialize(conf);
256        }
257        catch (final Exception ex) {
258            throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex);
259        }
260
261    }
262
263    private void initRetryHandler() {
264        final long initialWaitTime = ConfigurationService.getInt(INITIAL_WAIT_TIME, (int) DEFAULT_INITIAL_WAIT_TIME);
265        final long maxWaitTime = ConfigurationService.getInt(MAX_WAIT_TIME, (int) DEFAULT_MAX_WAIT_TIME);
266        final int maxRetryCount = ConfigurationService.getInt(MAX_RETRY_COUNT, DEFAULT_MAX_RETRY_COUNT);
267
268        LOG.info(XLog.STD, "Failing database operations will be retried {0} times, with an initial sleep time of {1} ms,"
269                + "max sleep time {2} ms", maxRetryCount, initialWaitTime, maxWaitTime);
270        retryHandler = new OperationRetryHandler(maxRetryCount,
271                initialWaitTime,
272                maxWaitTime,
273                new PersistenceExceptionSubclassFilterRetryPredicate());
274    }
275
276    private void findRetrying(final EntityManager entityManager, final Class entityClass, final int primaryKey)
277            throws ServiceException {
278        try {
279            retryHandler.executeWithRetry(new Callable<Void>() {
280                @Override
281                public Void call() throws Exception {
282                    if (!entityManager.getTransaction().isActive()) {
283                        entityManager.getTransaction().begin();
284                    }
285
286                    entityManager.find(entityClass, primaryKey);
287
288                    if (entityManager.getTransaction().isActive()) {
289                        entityManager.getTransaction().commit();
290                    }
291                    return null;
292                }
293            });
294        }
295        catch (final Exception e) {
296            throw new ServiceException(ErrorCode.E0603, e);
297        }
298    }
299
300    /**
301     * Destroy the JPAService
302     */
303    public void destroy() {
304        if (factory != null && factory.isOpen()) {
305            try {
306                factory.close();
307            }
308            catch (final InvalidStateException ise) {
309                LOG.warn("Cannot close EntityManagerFactory. [ise.message={0}]", ise.getMessage());
310            }
311        }
312    }
313
314    /**
315     * Execute a {@link JPAExecutor}.
316     *
317     * @param executor JPAExecutor to execute.
318     * @return return value of the JPAExecutor.
319     * @throws JPAExecutorException thrown if an jpa executor failed
320     */
321    public <T> T execute(final JPAExecutor<T> executor) throws JPAExecutorException {
322        final EntityManager em = getEntityManager();
323        final Instrumentation.Cron cron = new Instrumentation.Cron();
324        try {
325            LOG.trace("Executing JPAExecutor [{0}]", executor.getName());
326            if (instr != null) {
327                instr.incr(INSTRUMENTATION_GROUP_JPA, executor.getName(), 1);
328            }
329            cron.start();
330
331            return retryHandler.executeWithRetry(new Callable<T>() {
332                @Override
333                public T call() throws Exception {
334                    if (!em.getTransaction().isActive()) {
335                        em.getTransaction().begin();
336                    }
337
338                    final T t = executor.execute(em);
339
340                    checkAndCommit(em.getTransaction());
341
342                    return t;
343                }
344            });
345        }
346        catch (final Exception e) {
347            throw getTargetException(e);
348        }
349        finally {
350            cron.stop();
351            if (instr != null) {
352                instr.addCron(INSTRUMENTATION_GROUP_JPA, executor.getName(), cron);
353            }
354            try {
355                if (em.getTransaction().isActive()) {
356                    LOG.warn("JPAExecutor [{0}] ended with an active transaction, rolling back", executor.getName());
357                    em.getTransaction().rollback();
358                }
359            }
360            catch (final Exception ex) {
361                LOG.warn("Could not check/rollback transaction after JPAExecutor [{0}], {1}", executor.getName(), ex
362                        .getMessage(), ex);
363            }
364            try {
365                if (em.isOpen()) {
366                    em.close();
367                }
368                else {
369                    LOG.warn("JPAExecutor [{0}] closed the EntityManager, it should not!", executor.getName());
370                }
371            }
372            catch (final Exception ex) {
373                LOG.warn("Could not close EntityManager after JPAExecutor [{0}], {1}", executor.getName(), ex
374                        .getMessage(), ex);
375            }
376        }
377    }
378
379    private void checkAndCommit(final EntityTransaction tx) throws JPAExecutorException {
380        if (tx.isActive()) {
381            if (FaultInjection.isActive(SKIP_COMMIT_FAULT_INJECTION_CLASS)) {
382                throw new JPAExecutorException(ErrorCode.E0603, "Skipping Commit for Failover Testing");
383            }
384
385            tx.commit();
386        }
387    }
388
389    /**
390     * Execute an UPDATE query
391     * @param namedQueryName the name of query to be executed
392     * @param query query instance to be executed
393     * @param em Entity Manager
394     * @return Integer that query returns, which corresponds to the number of rows updated
395     * @throws JPAExecutorException
396     */
397    public int executeUpdate(final String namedQueryName, final Query query, final EntityManager em) throws JPAExecutorException {
398        final Instrumentation.Cron cron = new Instrumentation.Cron();
399        try {
400
401            LOG.trace("Executing Update/Delete Query [{0}]", namedQueryName);
402            if (instr != null) {
403                instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1);
404            }
405            cron.start();
406
407            return retryHandler.executeWithRetry(new Callable<Integer>() {
408                @Override
409                public Integer call() throws Exception {
410                    if (!em.getTransaction().isActive()) {
411                        em.getTransaction().begin();
412                    }
413                    final int ret = query.executeUpdate();
414
415                    checkAndCommit(em.getTransaction());
416
417                    return ret;
418                }
419            });
420        }
421        catch (final Exception e) {
422            throw getTargetException(e);
423        }
424        finally {
425            processFinally(em, cron, namedQueryName, true);
426        }
427    }
428
429    public static class QueryEntry<E extends Enum<E>> {
430        E namedQuery;
431        Query query;
432
433        public QueryEntry(final E namedQuery, final Query query) {
434            this.namedQuery = namedQuery;
435            this.query = query;
436        }
437
438        public Query getQuery() {
439            return this.query;
440        }
441
442        public E getQueryName() {
443            return this.namedQuery;
444        }
445    }
446
447    private void processFinally(final EntityManager em,
448                                final Instrumentation.Cron cron,
449                                final String name,
450                                final boolean checkActive) {
451        cron.stop();
452        if (instr != null) {
453            instr.addCron(INSTRUMENTATION_GROUP_JPA, name, cron);
454        }
455        if (checkActive) {
456            try {
457                if (em.getTransaction().isActive()) {
458                    LOG.warn("[{0}] ended with an active transaction, rolling back", name);
459                    em.getTransaction().rollback();
460                }
461            }
462            catch (final Exception ex) {
463                LOG.warn("Could not check/rollback transaction after [{0}], {1}", name,
464                        ex.getMessage(), ex);
465            }
466        }
467        try {
468            if (em.isOpen()) {
469                em.close();
470            }
471            else {
472                LOG.warn("[{0}] closed the EntityManager, it should not!", name);
473            }
474        }
475        catch (final Exception ex) {
476            LOG.warn("Could not close EntityManager after [{0}], {1}", name, ex.getMessage(), ex);
477        }
478    }
479
480    /**
481     * Execute multiple update/insert queries in one transaction
482     * @param insertBeans list of beans to be inserted
483     * @param updateQueryList list of update queries
484     * @param deleteBeans list of beans to be deleted
485     * @param em Entity Manager
486     * @throws JPAExecutorException
487     */
488    public void executeBatchInsertUpdateDelete(final Collection<JsonBean> insertBeans, final List<QueryEntry> updateQueryList,
489            final Collection<JsonBean> deleteBeans, final EntityManager em) throws JPAExecutorException {
490        final Instrumentation.Cron cron = new Instrumentation.Cron();
491        try {
492
493            LOG.trace("Executing Queries in Batch");
494            cron.start();
495
496            retryHandler.executeWithRetry(new Callable<Void>() {
497                @Override
498                public Void call() throws Exception {
499                    if (em.getTransaction().isActive()) {
500                        try {
501                            em.getTransaction().rollback();
502                        }
503                        catch (final Exception e) {
504                            LOG.warn("Rollback failed - ignoring");
505                        }
506                    }
507
508                    em.getTransaction().begin();
509
510                    if (CollectionUtils.isNotEmpty(updateQueryList)) {
511                        for (final QueryEntry q : updateQueryList) {
512                            if (instr != null) {
513                                instr.incr(INSTRUMENTATION_GROUP_JPA, q.getQueryName().name(), 1);
514                            }
515                            q.getQuery().executeUpdate();
516                        }
517                    }
518
519                    if (CollectionUtils.isNotEmpty(insertBeans)) {
520                        for (final JsonBean bean : insertBeans) {
521                            em.persist(bean);
522                        }
523                    }
524
525                    if (CollectionUtils.isNotEmpty(deleteBeans)) {
526                        for (final JsonBean bean : deleteBeans) {
527                            em.remove(em.merge(bean));
528                        }
529                    }
530
531                    checkAndCommit(em.getTransaction());
532
533                    return null;
534                }
535            });
536        }
537        catch (final Exception e) {
538            throw getTargetException(e);
539        }
540        finally {
541            processFinally(em, cron, "batchqueryexecutor", true);
542        }
543    }
544
545    /**
546     * Execute a SELECT query
547     * @param namedQueryName the name of query to be executed
548     * @param query query instance to be executed
549     * @param em Entity Manager
550     * @return object that matches the query
551     */
552    public Object executeGet(final String namedQueryName, final Query query, final EntityManager em) throws JPAExecutorException {
553        final Instrumentation.Cron cron = new Instrumentation.Cron();
554        try {
555            LOG.trace("Executing Select Query to Get a Single row  [{0}]", namedQueryName);
556            if (instr != null) {
557                instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1);
558            }
559
560            cron.start();
561
562            return retryHandler.executeWithRetry(new Callable<Object>() {
563                @Override
564                public Object call() throws Exception {
565                    Object obj = null;
566                    try {
567                        obj = query.getSingleResult();
568                    }
569                    catch (final NoResultException e) {
570                        LOG.info("No results found");
571                        // return null when no matched result
572                    }
573                    return obj;
574                }
575            });
576        }
577        catch (final Exception e) {
578            throw getTargetException(e);
579        }
580        finally {
581            processFinally(em, cron, namedQueryName, false);
582        }
583    }
584
585    /**
586     * Execute a SELECT query to get list of results
587     * @param namedQueryName the name of query to be executed
588     * @param query query instance to be executed
589     * @param em Entity Manager
590     * @return list containing results that match the query
591     */
592    public List<?> executeGetList(final String namedQueryName, final Query query, final EntityManager em)
593            throws JPAExecutorException {
594        final Instrumentation.Cron cron = new Instrumentation.Cron();
595        try {
596
597            LOG.trace("Executing Select Query to Get Multiple Rows [{0}]", namedQueryName);
598            if (instr != null) {
599                instr.incr(INSTRUMENTATION_GROUP_JPA, namedQueryName, 1);
600            }
601
602            cron.start();
603
604            return retryHandler.executeWithRetry(new Callable<List<?>>() {
605                @Override
606                public List<?> call() throws Exception {
607                    List<?> resultList = null;
608                    try {
609                        resultList = query.getResultList();
610                    }
611                    catch (final NoResultException e) {
612                        LOG.info("No results found");
613                        // return null when no matched result
614                    }
615                    return resultList;
616                }
617            });
618        }
619        catch (final Exception e) {
620            throw getTargetException(e);
621        }
622        finally {
623            processFinally(em, cron, namedQueryName, false);
624        }
625    }
626
627    /**
628     * Return an EntityManager. Used by the StoreService. Once the StoreService is removed this method must be removed.
629     *
630     * @return an entity manager
631     */
632    public EntityManager getEntityManager() {
633        return factory.createEntityManager();
634    }
635
636    private JPAExecutorException getTargetException(final Exception e) {
637        if (e instanceof JPAExecutorException) {
638            return (JPAExecutorException) e;
639        }
640        else {
641            return new JPAExecutorException(ErrorCode.E0603, e.getMessage());
642        }
643    }
644}