001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.io.IOException;
021    import java.text.MessageFormat;
022    import java.util.Properties;
023    
024    import javax.persistence.EntityManager;
025    import javax.persistence.EntityManagerFactory;
026    import javax.persistence.Persistence;
027    import javax.persistence.PersistenceException;
028    
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.oozie.BundleActionBean;
031    import org.apache.oozie.BundleJobBean;
032    import org.apache.oozie.CoordinatorActionBean;
033    import org.apache.oozie.CoordinatorJobBean;
034    import org.apache.oozie.ErrorCode;
035    import org.apache.oozie.FaultInjection;
036    import org.apache.oozie.SLAEventBean;
037    import org.apache.oozie.WorkflowActionBean;
038    import org.apache.oozie.WorkflowJobBean;
039    import org.apache.oozie.client.rest.JsonBundleJob;
040    import org.apache.oozie.client.rest.JsonCoordinatorAction;
041    import org.apache.oozie.client.rest.JsonCoordinatorJob;
042    import org.apache.oozie.client.rest.JsonSLAEvent;
043    import org.apache.oozie.client.rest.JsonWorkflowAction;
044    import org.apache.oozie.client.rest.JsonWorkflowJob;
045    import org.apache.oozie.executor.jpa.JPAExecutor;
046    import org.apache.oozie.executor.jpa.JPAExecutorException;
047    import org.apache.oozie.sla.SLARegistrationBean;
048    import org.apache.oozie.sla.SLASummaryBean;
049    import org.apache.oozie.util.IOUtils;
050    import org.apache.oozie.util.Instrumentable;
051    import org.apache.oozie.util.Instrumentation;
052    import org.apache.oozie.util.XLog;
053    import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
054    
055    /**
056     * Service that manages JPA and executes {@link JPAExecutor}.
057     */
058    @SuppressWarnings("deprecation")
059    public class JPAService implements Service, Instrumentable {
060        private static final String INSTRUMENTATION_GROUP = "jpa";
061    
062        public static final String CONF_DB_SCHEMA = "oozie.db.schema.name";
063    
064        public static final String CONF_PREFIX = Service.CONF_PREFIX + "JPAService.";
065        public static final String CONF_URL = CONF_PREFIX + "jdbc.url";
066        public static final String CONF_DRIVER = CONF_PREFIX + "jdbc.driver";
067        public static final String CONF_USERNAME = CONF_PREFIX + "jdbc.username";
068        public static final String CONF_PASSWORD = CONF_PREFIX + "jdbc.password";
069        public static final String CONF_CONN_DATA_SOURCE = CONF_PREFIX + "connection.data.source";
070    
071        public static final String CONF_MAX_ACTIVE_CONN = CONF_PREFIX + "pool.max.active.conn";
072        public static final String CONF_CREATE_DB_SCHEMA = CONF_PREFIX + "create.db.schema";
073        public static final String CONF_VALIDATE_DB_CONN = CONF_PREFIX + "validate.db.connection";
074        public static final String CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL = CONF_PREFIX + "validate.db.connection.eviction.interval";
075        public static final String CONF_VALIDATE_DB_CONN_EVICTION_NUM = CONF_PREFIX + "validate.db.connection.eviction.num";
076    
077    
078        private EntityManagerFactory factory;
079        private Instrumentation instr;
080    
081        private static XLog LOG;
082    
083        /**
084         * Return the public interface of the service.
085         *
086         * @return {@link JPAService}.
087         */
088        public Class<? extends Service> getInterface() {
089            return JPAService.class;
090        }
091    
092        @Override
093        public void instrument(Instrumentation instr) {
094            this.instr = instr;
095        }
096    
097        /**
098         * Initializes the {@link JPAService}.
099         *
100         * @param services services instance.
101         */
102        public void init(Services services) throws ServiceException {
103            LOG = XLog.getLog(JPAService.class);
104            Configuration conf = services.getConf();
105            String dbSchema = conf.get(CONF_DB_SCHEMA, "oozie");
106            String url = conf.get(CONF_URL, "jdbc:derby:${oozie.home.dir}/${oozie.db.schema.name}-db;create=true");
107            String driver = conf.get(CONF_DRIVER, "org.apache.derby.jdbc.EmbeddedDriver");
108            String user = conf.get(CONF_USERNAME, "sa");
109            String password = conf.get(CONF_PASSWORD, "").trim();
110            String maxConn = conf.get(CONF_MAX_ACTIVE_CONN, "10").trim();
111            String dataSource = conf.get(CONF_CONN_DATA_SOURCE, "org.apache.commons.dbcp.BasicDataSource");
112            boolean autoSchemaCreation = conf.getBoolean(CONF_CREATE_DB_SCHEMA, true);
113            boolean validateDbConn = conf.getBoolean(CONF_VALIDATE_DB_CONN, false);
114            String evictionInterval = conf.get(CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL, "300000").trim();
115            String evictionNum = conf.get(CONF_VALIDATE_DB_CONN_EVICTION_NUM, "10").trim();
116    
117            if (!url.startsWith("jdbc:")) {
118                throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, must start with 'jdbc:'");
119            }
120            String dbType = url.substring("jdbc:".length());
121            if (dbType.indexOf(":") <= 0) {
122                throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'");
123            }
124            dbType = dbType.substring(0, dbType.indexOf(":"));
125    
126            String persistentUnit = "oozie-" + dbType;
127    
128            // Checking existince of ORM file for DB type
129            String ormFile = "META-INF/" + persistentUnit + "-orm.xml";
130            try {
131                IOUtils.getResourceAsStream(ormFile, -1);
132            }
133            catch (IOException ex) {
134                throw new ServiceException(ErrorCode.E0609, dbType, ormFile);
135            }
136    
137            String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}";
138            connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn);
139            Properties props = new Properties();
140            if (autoSchemaCreation) {
141                connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
142                props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)");
143            }
144            else if (validateDbConn) {
145                // validation can be done only if the schema already exist, else a
146                // connection cannot be obtained to create the schema.
147                String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval;
148                String num = "numTestsPerEvictionRun=" + evictionNum;
149                connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num;
150                connProps += ",ValidationQuery=select count(*) from VALIDATE_CONN";
151                connProps = MessageFormat.format(connProps, dbSchema);
152            }
153            else {
154                connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
155            }
156            props.setProperty("openjpa.ConnectionProperties", connProps);
157    
158            props.setProperty("openjpa.ConnectionDriverName", dataSource);
159    
160            factory = Persistence.createEntityManagerFactory(persistentUnit, props);
161    
162            EntityManager entityManager = getEntityManager();
163            entityManager.find(WorkflowActionBean.class, 1);
164            entityManager.find(WorkflowJobBean.class, 1);
165            entityManager.find(CoordinatorActionBean.class, 1);
166            entityManager.find(CoordinatorJobBean.class, 1);
167            entityManager.find(JsonWorkflowAction.class, 1);
168            entityManager.find(JsonWorkflowJob.class, 1);
169            entityManager.find(JsonCoordinatorAction.class, 1);
170            entityManager.find(JsonCoordinatorJob.class, 1);
171            entityManager.find(SLAEventBean.class, 1);
172            entityManager.find(JsonSLAEvent.class, 1);
173            entityManager.find(BundleJobBean.class, 1);
174            entityManager.find(JsonBundleJob.class, 1);
175            entityManager.find(BundleActionBean.class, 1);
176            entityManager.find(SLARegistrationBean.class, 1);
177            entityManager.find(SLASummaryBean.class, 1);
178    
179            LOG.info(XLog.STD, "All entities initialized");
180            // need to use a pseudo no-op transaction so all entities, datasource
181            // and connection pool are initialized one time only
182            entityManager.getTransaction().begin();
183            OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory;
184            // Mask the password with '***'
185            String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,");
186            LOG.info("JPA configuration: {0}", logMsg);
187            entityManager.getTransaction().commit();
188            entityManager.close();
189        }
190    
191        /**
192         * Destroy the JPAService
193         */
194        public void destroy() {
195            if (factory != null && factory.isOpen()) {
196                factory.close();
197            }
198        }
199    
200        /**
201         * Execute a {@link JPAExecutor}.
202         *
203         * @param executor JPAExecutor to execute.
204         * @return return value of the JPAExecutor.
205         * @throws JPAExecutorException thrown if an jpa executor failed
206         */
207        public <T> T execute(JPAExecutor<T> executor) throws JPAExecutorException {
208            EntityManager em = getEntityManager();
209            Instrumentation.Cron cron = new Instrumentation.Cron();
210            try {
211                LOG.trace("Executing JPAExecutor [{0}]", executor.getName());
212                if (instr != null) {
213                    instr.incr(INSTRUMENTATION_GROUP, executor.getName(), 1);
214                }
215                cron.start();
216                em.getTransaction().begin();
217                T t = executor.execute(em);
218                if (em.getTransaction().isActive()) {
219                    if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
220                        throw new RuntimeException("Skipping Commit for Failover Testing");
221                    }
222    
223                    em.getTransaction().commit();
224                }
225                return t;
226            }
227            catch (PersistenceException e) {
228                throw new JPAExecutorException(ErrorCode.E0603, e);
229            }
230            finally {
231                cron.stop();
232                if (instr != null) {
233                    instr.addCron(INSTRUMENTATION_GROUP, executor.getName(), cron);
234                }
235                try {
236                    if (em.getTransaction().isActive()) {
237                        LOG.warn("JPAExecutor [{0}] ended with an active transaction, rolling back", executor.getName());
238                        em.getTransaction().rollback();
239                    }
240                }
241                catch (Exception ex) {
242                    LOG.warn("Could not check/rollback transaction after JPAExecutor [{0}], {1}", executor.getName(), ex
243                            .getMessage(), ex);
244                }
245                try {
246                    if (em.isOpen()) {
247                        em.close();
248                    }
249                    else {
250                        LOG.warn("JPAExecutor [{0}] closed the EntityManager, it should not!", executor.getName());
251                    }
252                }
253                catch (Exception ex) {
254                    LOG.warn("Could not close EntityManager after JPAExecutor [{0}], {1}", executor.getName(), ex
255                            .getMessage(), ex);
256                }
257            }
258        }
259    
260        /**
261         * Return an EntityManager. Used by the StoreService. Once the StoreService is removed this method must be removed.
262         *
263         * @return an entity manager
264         */
265        EntityManager getEntityManager() {
266            return factory.createEntityManager();
267        }
268    
269    }