001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.executor.jpa;
019
020import java.util.ArrayList;
021import java.util.Collection;
022import java.util.List;
023import javax.persistence.EntityManager;
024import javax.persistence.Query;
025
026import org.apache.oozie.BundleActionBean;
027import org.apache.oozie.BundleJobBean;
028import org.apache.oozie.CoordinatorActionBean;
029import org.apache.oozie.CoordinatorJobBean;
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.WorkflowActionBean;
032import org.apache.oozie.WorkflowJobBean;
033import org.apache.oozie.client.rest.JsonBean;
034import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
035import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
036import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
037import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
038import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
039import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
040import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
041import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
042import org.apache.oozie.service.JPAService;
043import org.apache.oozie.service.JPAService.QueryEntry;
044import org.apache.oozie.service.Services;
045import org.apache.oozie.sla.SLARegistrationBean;
046import org.apache.oozie.sla.SLASummaryBean;
047
048/**
049 * Query Executor that provides API to run multiple update/insert queries in one
050 * transaction. This guarantees entire change to be rolled back when one of
051 * queries fails.
052 */
053public class BatchQueryExecutor {
054
055    private static BatchQueryExecutor instance = new BatchQueryExecutor();
056
057    public static class UpdateEntry<E extends Enum<E>> {
058        E namedQuery;
059        JsonBean bean;
060
061        public UpdateEntry(E namedQuery, JsonBean bean) {
062            this.bean = bean;
063            this.namedQuery = namedQuery;
064        }
065
066        public JsonBean getBean() {
067            return this.bean;
068        }
069
070        public E getQueryName() {
071            return this.namedQuery;
072        }
073    }
074
075    private BatchQueryExecutor() {
076    }
077
078    public static BatchQueryExecutor getInstance() {
079        return BatchQueryExecutor.instance;
080    }
081
082    @SuppressWarnings("rawtypes")
083    public void executeBatchInsertUpdateDelete(Collection<JsonBean> insertList, Collection<UpdateEntry> updateList,
084            Collection<JsonBean> deleteList) throws JPAExecutorException {
085        List<QueryEntry> queryList = new ArrayList<QueryEntry>();
086        JPAService jpaService = Services.get().get(JPAService.class);
087        EntityManager em = jpaService.getEntityManager();
088
089        if (updateList != null) {
090            for (UpdateEntry entry : updateList) {
091                Query query = null;
092                JsonBean bean = entry.getBean();
093                if (bean instanceof WorkflowJobBean) {
094                    query = WorkflowJobQueryExecutor.getInstance().getUpdateQuery(
095                            (WorkflowJobQuery) entry.getQueryName(), (WorkflowJobBean) entry.getBean(), em);
096                }
097                else if (bean instanceof WorkflowActionBean) {
098                    query = WorkflowActionQueryExecutor.getInstance().getUpdateQuery(
099                            (WorkflowActionQuery) entry.getQueryName(), (WorkflowActionBean) entry.getBean(), em);
100                }
101                else if (bean instanceof CoordinatorJobBean) {
102                    query = CoordJobQueryExecutor.getInstance().getUpdateQuery((CoordJobQuery) entry.getQueryName(),
103                            (CoordinatorJobBean) entry.getBean(), em);
104                }
105                else if (bean instanceof CoordinatorActionBean) {
106                    query = CoordActionQueryExecutor.getInstance().getUpdateQuery(
107                            (CoordActionQuery) entry.getQueryName(), (CoordinatorActionBean) entry.getBean(), em);
108                }
109                else if (bean instanceof BundleJobBean) {
110                    query = BundleJobQueryExecutor.getInstance().getUpdateQuery((BundleJobQuery) entry.getQueryName(),
111                            (BundleJobBean) entry.getBean(), em);
112                }
113                else if (bean instanceof BundleActionBean) {
114                    query = BundleActionQueryExecutor.getInstance().getUpdateQuery(
115                            (BundleActionQuery) entry.getQueryName(), (BundleActionBean) entry.getBean(), em);
116                }
117                else if (bean instanceof SLARegistrationBean) {
118                    query = SLARegistrationQueryExecutor.getInstance().getUpdateQuery(
119                            (SLARegQuery) entry.getQueryName(), (SLARegistrationBean) entry.getBean(), em);
120                }
121                else if (bean instanceof SLASummaryBean) {
122                    query = SLASummaryQueryExecutor.getInstance().getUpdateQuery(
123                            (SLASummaryQuery) entry.getQueryName(), (SLASummaryBean) entry.getBean(), em);
124                }
125                else {
126                    throw new JPAExecutorException(ErrorCode.E0603, "BatchQueryExecutor faield to construct a query");
127                }
128                queryList.add(new QueryEntry(entry.getQueryName(), query));
129            }
130        }
131        jpaService.executeBatchInsertUpdateDelete(insertList, queryList, deleteList, em);
132    }
133
134}