001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.executor.jpa; 019 020import java.util.ArrayList; 021import java.util.Collection; 022import java.util.List; 023import javax.persistence.EntityManager; 024import javax.persistence.Query; 025 026import org.apache.oozie.BundleActionBean; 027import org.apache.oozie.BundleJobBean; 028import org.apache.oozie.CoordinatorActionBean; 029import org.apache.oozie.CoordinatorJobBean; 030import org.apache.oozie.ErrorCode; 031import org.apache.oozie.WorkflowActionBean; 032import org.apache.oozie.WorkflowJobBean; 033import org.apache.oozie.client.rest.JsonBean; 034import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; 035import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; 036import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 037import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 038import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery; 039import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; 040import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 041import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 042import org.apache.oozie.service.JPAService; 043import org.apache.oozie.service.JPAService.QueryEntry; 044import org.apache.oozie.service.Services; 045import org.apache.oozie.sla.SLARegistrationBean; 046import org.apache.oozie.sla.SLASummaryBean; 047 048/** 049 * Query Executor that provides API to run multiple update/insert queries in one 050 * transaction. This guarantees entire change to be rolled back when one of 051 * queries fails. 052 */ 053public class BatchQueryExecutor { 054 055 private static BatchQueryExecutor instance = new BatchQueryExecutor(); 056 057 public static class UpdateEntry<E extends Enum<E>> { 058 E namedQuery; 059 JsonBean bean; 060 061 public UpdateEntry(E namedQuery, JsonBean bean) { 062 this.bean = bean; 063 this.namedQuery = namedQuery; 064 } 065 066 public JsonBean getBean() { 067 return this.bean; 068 } 069 070 public E getQueryName() { 071 return this.namedQuery; 072 } 073 } 074 075 private BatchQueryExecutor() { 076 } 077 078 public static BatchQueryExecutor getInstance() { 079 return BatchQueryExecutor.instance; 080 } 081 082 @SuppressWarnings("rawtypes") 083 public void executeBatchInsertUpdateDelete(Collection<JsonBean> insertList, Collection<UpdateEntry> updateList, 084 Collection<JsonBean> deleteList) throws JPAExecutorException { 085 List<QueryEntry> queryList = new ArrayList<QueryEntry>(); 086 JPAService jpaService = Services.get().get(JPAService.class); 087 EntityManager em = jpaService.getEntityManager(); 088 089 if (updateList != null) { 090 for (UpdateEntry entry : updateList) { 091 Query query = null; 092 JsonBean bean = entry.getBean(); 093 if (bean instanceof WorkflowJobBean) { 094 query = WorkflowJobQueryExecutor.getInstance().getUpdateQuery( 095 (WorkflowJobQuery) entry.getQueryName(), (WorkflowJobBean) entry.getBean(), em); 096 } 097 else if (bean instanceof WorkflowActionBean) { 098 query = WorkflowActionQueryExecutor.getInstance().getUpdateQuery( 099 (WorkflowActionQuery) entry.getQueryName(), (WorkflowActionBean) entry.getBean(), em); 100 } 101 else if (bean instanceof CoordinatorJobBean) { 102 query = CoordJobQueryExecutor.getInstance().getUpdateQuery((CoordJobQuery) entry.getQueryName(), 103 (CoordinatorJobBean) entry.getBean(), em); 104 } 105 else if (bean instanceof CoordinatorActionBean) { 106 query = CoordActionQueryExecutor.getInstance().getUpdateQuery( 107 (CoordActionQuery) entry.getQueryName(), (CoordinatorActionBean) entry.getBean(), em); 108 } 109 else if (bean instanceof BundleJobBean) { 110 query = BundleJobQueryExecutor.getInstance().getUpdateQuery((BundleJobQuery) entry.getQueryName(), 111 (BundleJobBean) entry.getBean(), em); 112 } 113 else if (bean instanceof BundleActionBean) { 114 query = BundleActionQueryExecutor.getInstance().getUpdateQuery( 115 (BundleActionQuery) entry.getQueryName(), (BundleActionBean) entry.getBean(), em); 116 } 117 else if (bean instanceof SLARegistrationBean) { 118 query = SLARegistrationQueryExecutor.getInstance().getUpdateQuery( 119 (SLARegQuery) entry.getQueryName(), (SLARegistrationBean) entry.getBean(), em); 120 } 121 else if (bean instanceof SLASummaryBean) { 122 query = SLASummaryQueryExecutor.getInstance().getUpdateQuery( 123 (SLASummaryQuery) entry.getQueryName(), (SLASummaryBean) entry.getBean(), em); 124 } 125 else { 126 throw new JPAExecutorException(ErrorCode.E0603, "BatchQueryExecutor faield to construct a query"); 127 } 128 queryList.add(new QueryEntry(entry.getQueryName(), query)); 129 } 130 } 131 jpaService.executeBatchInsertUpdateDelete(insertList, queryList, deleteList, em); 132 } 133 134}