001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.executor.jpa; 020 021import java.sql.Timestamp; 022import java.util.ArrayList; 023import java.util.List; 024 025import javax.persistence.EntityManager; 026import javax.persistence.Query; 027 028import org.apache.oozie.BundleActionBean; 029import org.apache.oozie.ErrorCode; 030import org.apache.oozie.service.JPAService; 031import org.apache.oozie.service.Services; 032 033/** 034 * Query Executor that provides API to run query for Bundle Action 035 */ 036public class BundleActionQueryExecutor extends 037 QueryExecutor<BundleActionBean, BundleActionQueryExecutor.BundleActionQuery> { 038 039 public enum BundleActionQuery { 040 UPDATE_BUNDLE_ACTION_PENDING_MODTIME, 041 UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME, 042 UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID, 043 GET_BUNDLE_ACTION, 044 GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, 045 GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN, 046 GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE 047 }; 048 049 private static BundleActionQueryExecutor instance = new BundleActionQueryExecutor(); 050 051 private BundleActionQueryExecutor() { 052 } 053 054 public static QueryExecutor<BundleActionBean, BundleActionQueryExecutor.BundleActionQuery> getInstance() { 055 return BundleActionQueryExecutor.instance; 056 } 057 058 @Override 059 public Query getUpdateQuery(BundleActionQuery namedQuery, BundleActionBean baBean, EntityManager em) 060 throws JPAExecutorException { 061 062 Query query = em.createNamedQuery(namedQuery.name()); 063 switch (namedQuery) { 064 case UPDATE_BUNDLE_ACTION_PENDING_MODTIME: 065 query.setParameter("lastModifiedTime", baBean.getLastModifiedTimestamp()); 066 query.setParameter("pending", baBean.getPending()); 067 query.setParameter("bundleActionId", baBean.getBundleActionId()); 068 break; 069 case UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME: 070 query.setParameter("status", baBean.getStatusStr()); 071 query.setParameter("lastModifiedTime", baBean.getLastModifiedTimestamp()); 072 query.setParameter("pending", baBean.getPending()); 073 query.setParameter("bundleActionId", baBean.getBundleActionId()); 074 break; 075 case UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID: 076 query.setParameter("status", baBean.getStatusStr()); 077 query.setParameter("lastModifiedTime", baBean.getLastModifiedTimestamp()); 078 query.setParameter("pending", baBean.getPending()); 079 query.setParameter("coordId", baBean.getCoordId()); 080 query.setParameter("bundleActionId", baBean.getBundleActionId()); 081 break; 082 default: 083 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 084 + namedQuery.name()); 085 } 086 return query; 087 } 088 089 @Override 090 public Query getSelectQuery(BundleActionQuery namedQuery, EntityManager em, Object... parameters) 091 throws JPAExecutorException { 092 Query query = em.createNamedQuery(namedQuery.name()); 093 switch (namedQuery) { 094 case GET_BUNDLE_ACTION: 095 query.setParameter("bundleActionId", parameters[0]); 096 break; 097 case GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE: 098 query.setParameter("bundleId", parameters[0]); 099 break; 100 case GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN: 101 Timestamp ts = new Timestamp(System.currentTimeMillis() - (Long)parameters[0] * 1000); 102 query.setParameter("lastModifiedTime", ts); 103 break; 104 case GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE: 105 query.setParameter("bundleId", parameters[0]); 106 break; 107 default: 108 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 109 + namedQuery.name()); 110 } 111 return query; 112 } 113 114 @Override 115 public int executeUpdate(BundleActionQuery namedQuery, BundleActionBean jobBean) throws JPAExecutorException { 116 JPAService jpaService = Services.get().get(JPAService.class); 117 EntityManager em = jpaService.getEntityManager(); 118 Query query = getUpdateQuery(namedQuery, jobBean, em); 119 int ret = jpaService.executeUpdate(namedQuery.name(), query, em); 120 return ret; 121 } 122 123 @Override 124 public BundleActionBean get(BundleActionQuery namedQuery, Object... parameters) throws JPAExecutorException { 125 BundleActionBean bean = getIfExist(namedQuery, parameters); 126 if (bean == null) { 127 throw new JPAExecutorException(ErrorCode.E0604, getSelectQuery(namedQuery, 128 Services.get().get(JPAService.class).getEntityManager(), parameters)); 129 } 130 return bean; 131 } 132 133 private BundleActionBean constructBean(BundleActionQuery namedQuery, Object ret) throws JPAExecutorException { 134 BundleActionBean bean; 135 Object[] arr; 136 switch (namedQuery) { 137 case GET_BUNDLE_ACTION: 138 case GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE: 139 bean = (BundleActionBean) ret; 140 break; 141 case GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN: 142 bean = new BundleActionBean(); 143 arr = (Object[]) ret; 144 bean.setBundleActionId((String) arr[0]); 145 bean.setBundleId((String) arr[1]); 146 bean.setStatusStr((String) arr[2]); 147 bean.setCoordId((String) arr[3]); 148 bean.setCoordName((String) arr[4]); 149 break; 150 case GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE: 151 bean = new BundleActionBean(); 152 arr = (Object[]) ret; 153 bean.setCoordId((String) arr[0]); 154 bean.setStatusStr((String) arr[1]); 155 bean.setPending((Integer) arr[2]); 156 break; 157 default: 158 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for " 159 + namedQuery.name()); 160 } 161 return bean; 162 } 163 164 @Override 165 public List<BundleActionBean> getList(BundleActionQuery namedQuery, Object... parameters) 166 throws JPAExecutorException { 167 JPAService jpaService = Services.get().get(JPAService.class); 168 EntityManager em = jpaService.getEntityManager(); 169 Query query = getSelectQuery(namedQuery, em, parameters); 170 List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em); 171 List<BundleActionBean> beanList = new ArrayList<BundleActionBean>(); 172 if (retList != null) { 173 for (Object ret : retList) { 174 beanList.add(constructBean(namedQuery, ret)); 175 } 176 } 177 return beanList; 178 } 179 180 @Override 181 public Object getSingleValue(BundleActionQuery namedQuery, Object... parameters) throws JPAExecutorException { 182 throw new UnsupportedOperationException(); 183 } 184 185 @Override 186 public BundleActionBean getIfExist(BundleActionQuery namedQuery, Object... parameters) throws JPAExecutorException { 187 JPAService jpaService = Services.get().get(JPAService.class); 188 EntityManager em = jpaService.getEntityManager(); 189 Query query = getSelectQuery(namedQuery, em, parameters); 190 Object ret = jpaService.executeGet(namedQuery.name(), query, em); 191 if (ret == null) { 192 return null; 193 } 194 BundleActionBean bean = constructBean(namedQuery, ret); 195 return bean; 196 } 197 198}