001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.executor.jpa; 020 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.List; 024import javax.persistence.EntityManager; 025import javax.persistence.Query; 026 027import org.apache.oozie.BundleActionBean; 028import org.apache.oozie.BundleJobBean; 029import org.apache.oozie.CoordinatorActionBean; 030import org.apache.oozie.CoordinatorJobBean; 031import org.apache.oozie.ErrorCode; 032import org.apache.oozie.WorkflowActionBean; 033import org.apache.oozie.WorkflowJobBean; 034import org.apache.oozie.client.rest.JsonBean; 035import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; 036import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; 037import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 038import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 039import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery; 040import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; 041import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 042import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 043import org.apache.oozie.service.JPAService; 044import org.apache.oozie.service.JPAService.QueryEntry; 045import org.apache.oozie.service.Services; 046import org.apache.oozie.sla.SLARegistrationBean; 047import org.apache.oozie.sla.SLASummaryBean; 048 049/** 050 * Query Executor that provides API to run multiple update/insert queries in one 051 * transaction. This guarantees entire change to be rolled back when one of 052 * queries fails. 053 */ 054public class BatchQueryExecutor { 055 056 private static BatchQueryExecutor instance = new BatchQueryExecutor(); 057 058 public static class UpdateEntry<E extends Enum<E>> { 059 E namedQuery; 060 JsonBean bean; 061 062 public UpdateEntry(E namedQuery, JsonBean bean) { 063 this.bean = bean; 064 this.namedQuery = namedQuery; 065 } 066 067 public JsonBean getBean() { 068 return this.bean; 069 } 070 071 public E getQueryName() { 072 return this.namedQuery; 073 } 074 } 075 076 private BatchQueryExecutor() { 077 } 078 079 public static BatchQueryExecutor getInstance() { 080 return BatchQueryExecutor.instance; 081 } 082 083 @SuppressWarnings("rawtypes") 084 public void executeBatchInsertUpdateDelete(Collection<JsonBean> insertList, Collection<UpdateEntry> updateList, 085 Collection<JsonBean> deleteList) throws JPAExecutorException { 086 List<QueryEntry> queryList = new ArrayList<QueryEntry>(); 087 JPAService jpaService = Services.get().get(JPAService.class); 088 EntityManager em = jpaService.getEntityManager(); 089 090 if (updateList != null) { 091 for (UpdateEntry entry : updateList) { 092 Query query = null; 093 JsonBean bean = entry.getBean(); 094 if (bean instanceof WorkflowJobBean) { 095 query = WorkflowJobQueryExecutor.getInstance().getUpdateQuery( 096 (WorkflowJobQuery) entry.getQueryName(), (WorkflowJobBean) entry.getBean(), em); 097 } 098 else if (bean instanceof WorkflowActionBean) { 099 query = WorkflowActionQueryExecutor.getInstance().getUpdateQuery( 100 (WorkflowActionQuery) entry.getQueryName(), (WorkflowActionBean) entry.getBean(), em); 101 } 102 else if (bean instanceof CoordinatorJobBean) { 103 query = CoordJobQueryExecutor.getInstance().getUpdateQuery((CoordJobQuery) entry.getQueryName(), 104 (CoordinatorJobBean) entry.getBean(), em); 105 } 106 else if (bean instanceof CoordinatorActionBean) { 107 query = CoordActionQueryExecutor.getInstance().getUpdateQuery( 108 (CoordActionQuery) entry.getQueryName(), (CoordinatorActionBean) entry.getBean(), em); 109 } 110 else if (bean instanceof BundleJobBean) { 111 query = BundleJobQueryExecutor.getInstance().getUpdateQuery((BundleJobQuery) entry.getQueryName(), 112 (BundleJobBean) entry.getBean(), em); 113 } 114 else if (bean instanceof BundleActionBean) { 115 query = BundleActionQueryExecutor.getInstance().getUpdateQuery( 116 (BundleActionQuery) entry.getQueryName(), (BundleActionBean) entry.getBean(), em); 117 } 118 else if (bean instanceof SLARegistrationBean) { 119 query = SLARegistrationQueryExecutor.getInstance().getUpdateQuery( 120 (SLARegQuery) entry.getQueryName(), (SLARegistrationBean) entry.getBean(), em); 121 } 122 else if (bean instanceof SLASummaryBean) { 123 query = SLASummaryQueryExecutor.getInstance().getUpdateQuery( 124 (SLASummaryQuery) entry.getQueryName(), (SLASummaryBean) entry.getBean(), em); 125 } 126 else { 127 throw new JPAExecutorException(ErrorCode.E0603, "BatchQueryExecutor faield to construct a query"); 128 } 129 queryList.add(new QueryEntry(entry.getQueryName(), query)); 130 } 131 } 132 jpaService.executeBatchInsertUpdateDelete(insertList, queryList, deleteList, em); 133 } 134 135}