001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.executor.jpa; 019 020import java.sql.Timestamp; 021import java.util.ArrayList; 022import java.util.List; 023 024import javax.persistence.EntityManager; 025import javax.persistence.Query; 026 027import org.apache.oozie.BundleJobBean; 028import org.apache.oozie.ErrorCode; 029import org.apache.oozie.StringBlob; 030import org.apache.oozie.service.JPAService; 031import org.apache.oozie.service.Services; 032import org.apache.oozie.util.DateUtils; 033 034/** 035 * Query Executor that provides API to run query for Bundle Job 036 */ 037public class BundleJobQueryExecutor extends QueryExecutor<BundleJobBean, BundleJobQueryExecutor.BundleJobQuery> { 038 039 public enum BundleJobQuery { 040 UPDATE_BUNDLE_JOB, 041 UPDATE_BUNDLE_JOB_STATUS, 042 UPDATE_BUNDLE_JOB_STATUS_PENDING, 043 UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME, 044 UPDATE_BUNDLE_JOB_STATUS_PENDING_SUSP_MOD_TIME, 045 UPDATE_BUNDLE_JOB_STATUS_PAUSE_ENDTIME, 046 UPDATE_BUNDLE_JOB_PAUSE_KICKOFF, 047 GET_BUNDLE_JOB, 048 GET_BUNDLE_JOB_STATUS, 049 GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME, 050 GET_BUNDLE_JOB_ID_JOBXML_CONF 051 }; 052 053 private static BundleJobQueryExecutor instance = new BundleJobQueryExecutor(); 054 055 private BundleJobQueryExecutor() { 056 } 057 058 public static QueryExecutor<BundleJobBean, BundleJobQueryExecutor.BundleJobQuery> getInstance() { 059 return BundleJobQueryExecutor.instance; 060 } 061 062 @Override 063 public Query getUpdateQuery(BundleJobQuery namedQuery, BundleJobBean bjBean, EntityManager em) 064 throws JPAExecutorException { 065 Query query = em.createNamedQuery(namedQuery.name()); 066 switch (namedQuery) { 067 case UPDATE_BUNDLE_JOB: 068 query.setParameter("appName", bjBean.getAppName()); 069 query.setParameter("appPath", bjBean.getAppPath()); 070 query.setParameter("conf", bjBean.getConfBlob()); 071 query.setParameter("timeOut", bjBean.getTimeout()); 072 query.setParameter("createdTime", bjBean.getCreatedTimestamp()); 073 query.setParameter("endTime", bjBean.getEndTimestamp()); 074 query.setParameter("jobXml", bjBean.getJobXmlBlob()); 075 query.setParameter("lastModifiedTime", bjBean.getLastModifiedTimestamp()); 076 query.setParameter("origJobXml", bjBean.getOrigJobXmlBlob()); 077 query.setParameter("startTime", bjBean.getstartTimestamp()); 078 query.setParameter("status", bjBean.getStatus().toString()); 079 query.setParameter("timeUnit", bjBean.getTimeUnit()); 080 query.setParameter("pending", bjBean.isPending() ? 1 : 0); 081 query.setParameter("id", bjBean.getId()); 082 break; 083 case UPDATE_BUNDLE_JOB_STATUS: 084 query.setParameter("status", bjBean.getStatus().toString()); 085 query.setParameter("lastModifiedTime", bjBean.getLastModifiedTimestamp()); 086 query.setParameter("pending", bjBean.getPending()); 087 query.setParameter("id", bjBean.getId()); 088 break; 089 case UPDATE_BUNDLE_JOB_STATUS_PENDING: 090 query.setParameter("status", bjBean.getStatus().toString()); 091 query.setParameter("pending", bjBean.getPending()); 092 query.setParameter("id", bjBean.getId()); 093 break; 094 case UPDATE_BUNDLE_JOB_STATUS_PENDING_SUSP_MOD_TIME: 095 query.setParameter("status", bjBean.getStatus().toString()); 096 query.setParameter("lastModifiedTime", bjBean.getLastModifiedTimestamp()); 097 query.setParameter("pending", bjBean.getPending()); 098 query.setParameter("suspendedTime", bjBean.getSuspendedTimestamp()); 099 query.setParameter("id", bjBean.getId()); 100 break; 101 case UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME: 102 query.setParameter("status", bjBean.getStatus().toString()); 103 query.setParameter("lastModifiedTime", bjBean.getLastModifiedTimestamp()); 104 query.setParameter("pending", bjBean.getPending()); 105 query.setParameter("id", bjBean.getId()); 106 break; 107 case UPDATE_BUNDLE_JOB_STATUS_PAUSE_ENDTIME: 108 query.setParameter("status", bjBean.getStatus().toString()); 109 query.setParameter("pauseTime", bjBean.getPauseTimestamp()); 110 query.setParameter("endTime", bjBean.getEndTimestamp()); 111 query.setParameter("id", bjBean.getId()); 112 break; 113 case UPDATE_BUNDLE_JOB_PAUSE_KICKOFF: 114 query.setParameter("pauseTime", bjBean.getPauseTimestamp()); 115 query.setParameter("kickoffTime", bjBean.getKickoffTimestamp()); 116 query.setParameter("id", bjBean.getId()); 117 break; 118 default: 119 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 120 + namedQuery.name()); 121 } 122 return query; 123 } 124 125 @Override 126 public Query getSelectQuery(BundleJobQuery namedQuery, EntityManager em, Object... parameters) 127 throws JPAExecutorException { 128 Query query = em.createNamedQuery(namedQuery.name()); 129 switch (namedQuery) { 130 case GET_BUNDLE_JOB: 131 case GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME: 132 case GET_BUNDLE_JOB_ID_JOBXML_CONF: 133 case GET_BUNDLE_JOB_STATUS: 134 query.setParameter("id", parameters[0]); 135 break; 136 default: 137 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 138 + namedQuery.name()); 139 } 140 return query; 141 } 142 143 @Override 144 public int executeUpdate(BundleJobQuery namedQuery, BundleJobBean jobBean) throws JPAExecutorException { 145 JPAService jpaService = Services.get().get(JPAService.class); 146 EntityManager em = jpaService.getEntityManager(); 147 Query query = getUpdateQuery(namedQuery, jobBean, em); 148 int ret = jpaService.executeUpdate(namedQuery.name(), query, em); 149 return ret; 150 } 151 152 @Override 153 public BundleJobBean get(BundleJobQuery namedQuery, Object... parameters) throws JPAExecutorException { 154 JPAService jpaService = Services.get().get(JPAService.class); 155 EntityManager em = jpaService.getEntityManager(); 156 Query query = getSelectQuery(namedQuery, em, parameters); 157 Object ret = jpaService.executeGet(namedQuery.name(), query, em); 158 if (ret == null) { 159 throw new JPAExecutorException(ErrorCode.E0604, query.toString()); 160 } 161 BundleJobBean bean = constructBean(namedQuery, ret, parameters); 162 return bean; 163 } 164 165 @Override 166 public List<BundleJobBean> getList(BundleJobQuery namedQuery, Object... parameters) throws JPAExecutorException { 167 JPAService jpaService = Services.get().get(JPAService.class); 168 EntityManager em = jpaService.getEntityManager(); 169 Query query = getSelectQuery(namedQuery, em, parameters); 170 List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em); 171 List<BundleJobBean> beanList = new ArrayList<BundleJobBean>(); 172 if (retList != null) { 173 for (Object ret : retList) { 174 beanList.add(constructBean(namedQuery, ret)); 175 } 176 } 177 return beanList; 178 } 179 180 private BundleJobBean constructBean(BundleJobQuery namedQuery, Object ret, Object... parameters) 181 throws JPAExecutorException { 182 BundleJobBean bean; 183 Object[] arr; 184 switch (namedQuery) { 185 case GET_BUNDLE_JOB: 186 bean = (BundleJobBean) ret; 187 break; 188 case GET_BUNDLE_JOB_STATUS: 189 bean = new BundleJobBean(); 190 bean.setId((String) parameters[0]); 191 bean.setStatus((String) ret); 192 break; 193 case GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME: 194 bean = new BundleJobBean(); 195 arr = (Object[]) ret; 196 bean.setId((String) arr[0]); 197 bean.setStatus((String) arr[1]); 198 bean.setPending((Integer) arr[2]); 199 bean.setLastModifiedTime(DateUtils.toDate((Timestamp) arr[3])); 200 break; 201 case GET_BUNDLE_JOB_ID_JOBXML_CONF: 202 bean = new BundleJobBean(); 203 arr = (Object[]) ret; 204 bean.setId((String) arr[0]); 205 bean.setJobXmlBlob((StringBlob) arr[1]); 206 bean.setConfBlob((StringBlob) arr[2]); 207 break; 208 default: 209 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for " 210 + namedQuery.name()); 211 } 212 return bean; 213 } 214 215 @Override 216 public Object getSingleValue(BundleJobQuery namedQuery, Object... parameters) throws JPAExecutorException { 217 throw new UnsupportedOperationException(); 218 } 219}