001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.executor.jpa;
020
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.List;
024import javax.persistence.EntityManager;
025import javax.persistence.Query;
026
027import org.apache.oozie.BundleActionBean;
028import org.apache.oozie.BundleJobBean;
029import org.apache.oozie.CoordinatorActionBean;
030import org.apache.oozie.CoordinatorJobBean;
031import org.apache.oozie.ErrorCode;
032import org.apache.oozie.WorkflowActionBean;
033import org.apache.oozie.WorkflowJobBean;
034import org.apache.oozie.client.rest.JsonBean;
035import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
036import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
037import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
038import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
039import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
040import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
041import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
042import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
043import org.apache.oozie.service.JPAService;
044import org.apache.oozie.service.JPAService.QueryEntry;
045import org.apache.oozie.service.Services;
046import org.apache.oozie.sla.SLARegistrationBean;
047import org.apache.oozie.sla.SLASummaryBean;
048
049/**
050 * Query Executor that provides API to run multiple update/insert queries in one
051 * transaction. This guarantees entire change to be rolled back when one of
052 * queries fails.
053 */
054public class BatchQueryExecutor {
055
056    private static BatchQueryExecutor instance = new BatchQueryExecutor();
057
058    public static class UpdateEntry<E extends Enum<E>> {
059        E namedQuery;
060        JsonBean bean;
061
062        public UpdateEntry(E namedQuery, JsonBean bean) {
063            this.bean = bean;
064            this.namedQuery = namedQuery;
065        }
066
067        public JsonBean getBean() {
068            return this.bean;
069        }
070
071        public E getQueryName() {
072            return this.namedQuery;
073        }
074    }
075
076    private BatchQueryExecutor() {
077    }
078
079    public static BatchQueryExecutor getInstance() {
080        return BatchQueryExecutor.instance;
081    }
082
083    @SuppressWarnings("rawtypes")
084    public void executeBatchInsertUpdateDelete(Collection<JsonBean> insertList, Collection<UpdateEntry> updateList,
085            Collection<JsonBean> deleteList) throws JPAExecutorException {
086        List<QueryEntry> queryList = new ArrayList<QueryEntry>();
087        JPAService jpaService = Services.get().get(JPAService.class);
088        EntityManager em = jpaService.getEntityManager();
089
090        if (updateList != null) {
091            for (UpdateEntry entry : updateList) {
092                Query query = null;
093                JsonBean bean = entry.getBean();
094                if (bean instanceof WorkflowJobBean) {
095                    query = WorkflowJobQueryExecutor.getInstance().getUpdateQuery(
096                            (WorkflowJobQuery) entry.getQueryName(), (WorkflowJobBean) entry.getBean(), em);
097                }
098                else if (bean instanceof WorkflowActionBean) {
099                    query = WorkflowActionQueryExecutor.getInstance().getUpdateQuery(
100                            (WorkflowActionQuery) entry.getQueryName(), (WorkflowActionBean) entry.getBean(), em);
101                }
102                else if (bean instanceof CoordinatorJobBean) {
103                    query = CoordJobQueryExecutor.getInstance().getUpdateQuery((CoordJobQuery) entry.getQueryName(),
104                            (CoordinatorJobBean) entry.getBean(), em);
105                }
106                else if (bean instanceof CoordinatorActionBean) {
107                    query = CoordActionQueryExecutor.getInstance().getUpdateQuery(
108                            (CoordActionQuery) entry.getQueryName(), (CoordinatorActionBean) entry.getBean(), em);
109                }
110                else if (bean instanceof BundleJobBean) {
111                    query = BundleJobQueryExecutor.getInstance().getUpdateQuery((BundleJobQuery) entry.getQueryName(),
112                            (BundleJobBean) entry.getBean(), em);
113                }
114                else if (bean instanceof BundleActionBean) {
115                    query = BundleActionQueryExecutor.getInstance().getUpdateQuery(
116                            (BundleActionQuery) entry.getQueryName(), (BundleActionBean) entry.getBean(), em);
117                }
118                else if (bean instanceof SLARegistrationBean) {
119                    query = SLARegistrationQueryExecutor.getInstance().getUpdateQuery(
120                            (SLARegQuery) entry.getQueryName(), (SLARegistrationBean) entry.getBean(), em);
121                }
122                else if (bean instanceof SLASummaryBean) {
123                    query = SLASummaryQueryExecutor.getInstance().getUpdateQuery(
124                            (SLASummaryQuery) entry.getQueryName(), (SLASummaryBean) entry.getBean(), em);
125                }
126                else {
127                    throw new JPAExecutorException(ErrorCode.E0603, "BatchQueryExecutor faield to construct a query");
128                }
129                queryList.add(new QueryEntry(entry.getQueryName(), query));
130            }
131        }
132        jpaService.executeBatchInsertUpdateDelete(insertList, queryList, deleteList, em);
133    }
134
135}