This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.io.IOException;
021 import java.text.MessageFormat;
022 import java.util.Properties;
023
024 import javax.persistence.EntityManager;
025 import javax.persistence.EntityManagerFactory;
026 import javax.persistence.Persistence;
027
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.oozie.BundleActionBean;
030 import org.apache.oozie.BundleJobBean;
031 import org.apache.oozie.CoordinatorActionBean;
032 import org.apache.oozie.CoordinatorJobBean;
033 import org.apache.oozie.ErrorCode;
034 import org.apache.oozie.FaultInjection;
035 import org.apache.oozie.SLAEventBean;
036 import org.apache.oozie.WorkflowActionBean;
037 import org.apache.oozie.WorkflowJobBean;
038 import org.apache.oozie.client.rest.JsonBundleJob;
039 import org.apache.oozie.client.rest.JsonCoordinatorAction;
040 import org.apache.oozie.client.rest.JsonCoordinatorJob;
041 import org.apache.oozie.client.rest.JsonSLAEvent;
042 import org.apache.oozie.client.rest.JsonWorkflowAction;
043 import org.apache.oozie.client.rest.JsonWorkflowJob;
044 import org.apache.oozie.executor.jpa.JPAExecutor;
045 import org.apache.oozie.executor.jpa.JPAExecutorException;
046 import org.apache.oozie.util.IOUtils;
047 import org.apache.oozie.util.Instrumentable;
048 import org.apache.oozie.util.Instrumentation;
049 import org.apache.oozie.util.XLog;
050 import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
051
052 /**
053 * Service that manages JPA and executes {@link JPAExecutor}.
054 */
055 public class JPAService implements Service, Instrumentable {
056 private static final String INSTRUMENTATION_GROUP = "jpa";
057
058 public static final String CONF_DB_SCHEMA = "oozie.db.schema.name";
059
060 public static final String CONF_PREFIX = Service.CONF_PREFIX + "JPAService.";
061 public static final String CONF_URL = CONF_PREFIX + "jdbc.url";
062 public static final String CONF_DRIVER = CONF_PREFIX + "jdbc.driver";
063 public static final String CONF_USERNAME = CONF_PREFIX + "jdbc.username";
064 public static final String CONF_PASSWORD = CONF_PREFIX + "jdbc.password";
065 public static final String CONF_CONN_DATA_SOURCE = CONF_PREFIX + "connection.data.source";
066
067 public static final String CONF_MAX_ACTIVE_CONN = CONF_PREFIX + "pool.max.active.conn";
068 public static final String CONF_CREATE_DB_SCHEMA = CONF_PREFIX + "create.db.schema";
069 public static final String CONF_VALIDATE_DB_CONN = CONF_PREFIX + "validate.db.connection";
070 public static final String CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL = CONF_PREFIX + "validate.db.connection.eviction.interval";
071 public static final String CONF_VALIDATE_DB_CONN_EVICTION_NUM = CONF_PREFIX + "validate.db.connection.eviction.num";
072
073
074 private EntityManagerFactory factory;
075 private Instrumentation instr;
076
077 private static XLog LOG;
078
079 /**
080 * Return the public interface of the service.
081 *
082 * @return {@link JPAService}.
083 */
084 public Class<? extends Service> getInterface() {
085 return JPAService.class;
086 }
087
088 @Override
089 public void instrument(Instrumentation instr) {
090 this.instr = instr;
091 }
092
093 /**
094 * Initializes the {@link JPAService}.
095 *
096 * @param services services instance.
097 */
098 public void init(Services services) throws ServiceException {
099 LOG = XLog.getLog(JPAService.class);
100 Configuration conf = services.getConf();
101 String dbSchema = conf.get(CONF_DB_SCHEMA, "oozie");
102 String url = conf.get(CONF_URL, "jdbc:derby:${oozie.home.dir}/${oozie.db.schema.name}-db;create=true");
103 String driver = conf.get(CONF_DRIVER, "org.apache.derby.jdbc.EmbeddedDriver");
104 String user = conf.get(CONF_USERNAME, "sa");
105 String password = conf.get(CONF_PASSWORD, "").trim();
106 String maxConn = conf.get(CONF_MAX_ACTIVE_CONN, "10").trim();
107 String dataSource = conf.get(CONF_CONN_DATA_SOURCE, "org.apache.commons.dbcp.BasicDataSource");
108 boolean autoSchemaCreation = conf.getBoolean(CONF_CREATE_DB_SCHEMA, true);
109 boolean validateDbConn = conf.getBoolean(CONF_VALIDATE_DB_CONN, false);
110 String evictionInterval = conf.get(CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL, "300000").trim();
111 String evictionNum = conf.get(CONF_VALIDATE_DB_CONN_EVICTION_NUM, "10").trim();
112
113 if (!url.startsWith("jdbc:")) {
114 throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, must start with 'jdbc:'");
115 }
116 String dbType = url.substring("jdbc:".length());
117 if (dbType.indexOf(":") <= 0) {
118 throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'");
119 }
120 dbType = dbType.substring(0, dbType.indexOf(":"));
121
122 String persistentUnit = "oozie-" + dbType;
123
124 // Checking existince of ORM file for DB type
125 String ormFile = "META-INF/" + persistentUnit + "-orm.xml";
126 try {
127 IOUtils.getResourceAsStream(ormFile, -1);
128 }
129 catch (IOException ex) {
130 throw new ServiceException(ErrorCode.E0609, dbType, ormFile);
131 }
132
133 String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}";
134 connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn);
135 Properties props = new Properties();
136 if (autoSchemaCreation) {
137 connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
138 props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)");
139 }
140 else if (validateDbConn) {
141 // validation can be done only if the schema already exist, else a
142 // connection cannot be obtained to create the schema.
143 String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval;
144 String num = "numTestsPerEvictionRun=" + evictionNum;
145 connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num;
146 connProps += ",ValidationQuery=select count(*) from VALIDATE_CONN";
147 connProps = MessageFormat.format(connProps, dbSchema);
148 }
149 else {
150 connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
151 }
152 props.setProperty("openjpa.ConnectionProperties", connProps);
153
154 props.setProperty("openjpa.ConnectionDriverName", dataSource);
155
156 factory = Persistence.createEntityManagerFactory(persistentUnit, props);
157
158 EntityManager entityManager = getEntityManager();
159 entityManager.find(WorkflowActionBean.class, 1);
160 entityManager.find(WorkflowJobBean.class, 1);
161 entityManager.find(CoordinatorActionBean.class, 1);
162 entityManager.find(CoordinatorJobBean.class, 1);
163 entityManager.find(JsonWorkflowAction.class, 1);
164 entityManager.find(JsonWorkflowJob.class, 1);
165 entityManager.find(JsonCoordinatorAction.class, 1);
166 entityManager.find(JsonCoordinatorJob.class, 1);
167 entityManager.find(SLAEventBean.class, 1);
168 entityManager.find(JsonSLAEvent.class, 1);
169 entityManager.find(BundleJobBean.class, 1);
170 entityManager.find(JsonBundleJob.class, 1);
171 entityManager.find(BundleActionBean.class, 1);
172
173 LOG.info(XLog.STD, "All entities initialized");
174 // need to use a pseudo no-op transaction so all entities, datasource
175 // and connection pool are initialized one time only
176 entityManager.getTransaction().begin();
177 OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory;
178 // Mask the password with '***'
179 String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,");
180 LOG.info("JPA configuration: {0}", logMsg);
181 entityManager.getTransaction().commit();
182 entityManager.close();
183 }
184
185 /**
186 * Destroy the JPAService
187 */
188 public void destroy() {
189 if (factory != null && factory.isOpen()) {
190 factory.close();
191 }
192 }
193
194 /**
195 * Execute a {@link JPAExecutor}.
196 *
197 * @param executor JPAExecutor to execute.
198 * @return return value of the JPAExecutor.
199 * @throws JPAExecutorException thrown if an jpa executor failed
200 */
201 public <T> T execute(JPAExecutor<T> executor) throws JPAExecutorException {
202 EntityManager em = getEntityManager();
203 Instrumentation.Cron cron = new Instrumentation.Cron();
204 try {
205 LOG.trace("Executing JPAExecutor [{0}]", executor.getName());
206 if (instr != null) {
207 instr.incr(INSTRUMENTATION_GROUP, executor.getName(), 1);
208 }
209 cron.start();
210 em.getTransaction().begin();
211 T t = executor.execute(em);
212 if (em.getTransaction().isActive()) {
213 if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
214 throw new RuntimeException("Skipping Commit for Failover Testing");
215 }
216
217 em.getTransaction().commit();
218 }
219 return t;
220 }
221 finally {
222 cron.stop();
223 if (instr != null) {
224 instr.addCron(INSTRUMENTATION_GROUP, executor.getName(), cron);
225 }
226 try {
227 if (em.getTransaction().isActive()) {
228 LOG.warn("JPAExecutor [{0}] ended with an active transaction, rolling back", executor.getName());
229 em.getTransaction().rollback();
230 }
231 }
232 catch (Exception ex) {
233 LOG.warn("Could not check/rollback transaction after JPAExecutor [{0}], {1}", executor.getName(), ex
234 .getMessage(), ex);
235 }
236 try {
237 if (em.isOpen()) {
238 em.close();
239 }
240 else {
241 LOG.warn("JPAExecutor [{0}] closed the EntityManager, it should not!", executor.getName());
242 }
243 }
244 catch (Exception ex) {
245 LOG.warn("Could not close EntityManager after JPAExecutor [{0}], {1}", executor.getName(), ex
246 .getMessage(), ex);
247 }
248 }
249 }
250
251 /**
252 * Return an EntityManager. Used by the StoreService. Once the StoreService is removed this method must be removed.
253 *
254 * @return an entity manager
255 */
256 EntityManager getEntityManager() {
257 return factory.createEntityManager();
258 }
259
260 }