001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.executor.jpa; 020 021import java.sql.Timestamp; 022import java.util.ArrayList; 023import java.util.Date; 024import java.util.List; 025 026import javax.persistence.EntityManager; 027import javax.persistence.Query; 028 029import org.apache.oozie.BundleJobBean; 030import org.apache.oozie.ErrorCode; 031import org.apache.oozie.StringBlob; 032import org.apache.oozie.service.JPAService; 033import org.apache.oozie.service.Services; 034import org.apache.oozie.util.DateUtils; 035 036/** 037 * Query Executor that provides API to run query for Bundle Job 038 */ 039public class BundleJobQueryExecutor extends QueryExecutor<BundleJobBean, BundleJobQueryExecutor.BundleJobQuery> { 040 041 public enum BundleJobQuery { 042 UPDATE_BUNDLE_JOB, 043 UPDATE_BUNDLE_JOB_STATUS, 044 UPDATE_BUNDLE_JOB_STATUS_PENDING, 045 UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME, 046 UPDATE_BUNDLE_JOB_STATUS_PENDING_SUSP_MOD_TIME, 047 UPDATE_BUNDLE_JOB_STATUS_PAUSE_ENDTIME, 048 UPDATE_BUNDLE_JOB_PAUSE_KICKOFF, 049 GET_BUNDLE_JOB, 050 GET_BUNDLE_JOB_STATUS, 051 GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME, 052 GET_BUNDLE_JOB_ID_JOBXML_CONF, 053 GET_BUNDLE_IDS_FOR_STATUS_TRANSIT 054 }; 055 056 private static BundleJobQueryExecutor instance = new BundleJobQueryExecutor(); 057 058 private BundleJobQueryExecutor() { 059 } 060 061 public static QueryExecutor<BundleJobBean, BundleJobQueryExecutor.BundleJobQuery> getInstance() { 062 return BundleJobQueryExecutor.instance; 063 } 064 065 @Override 066 public Query getUpdateQuery(BundleJobQuery namedQuery, BundleJobBean bjBean, EntityManager em) 067 throws JPAExecutorException { 068 Query query = em.createNamedQuery(namedQuery.name()); 069 switch (namedQuery) { 070 case UPDATE_BUNDLE_JOB: 071 query.setParameter("appName", bjBean.getAppName()); 072 query.setParameter("appPath", bjBean.getAppPath()); 073 query.setParameter("conf", bjBean.getConfBlob()); 074 query.setParameter("timeOut", bjBean.getTimeout()); 075 query.setParameter("createdTime", bjBean.getCreatedTimestamp()); 076 query.setParameter("endTime", bjBean.getEndTimestamp()); 077 query.setParameter("jobXml", bjBean.getJobXmlBlob()); 078 query.setParameter("lastModifiedTime", bjBean.getLastModifiedTimestamp()); 079 query.setParameter("origJobXml", bjBean.getOrigJobXmlBlob()); 080 query.setParameter("startTime", bjBean.getstartTimestamp()); 081 query.setParameter("status", bjBean.getStatus().toString()); 082 query.setParameter("timeUnit", bjBean.getTimeUnitStr()); 083 query.setParameter("pending", bjBean.isPending() ? 1 : 0); 084 query.setParameter("id", bjBean.getId()); 085 break; 086 case UPDATE_BUNDLE_JOB_STATUS: 087 query.setParameter("status", bjBean.getStatus().toString()); 088 query.setParameter("lastModifiedTime", bjBean.getLastModifiedTimestamp()); 089 query.setParameter("pending", bjBean.getPending()); 090 query.setParameter("id", bjBean.getId()); 091 break; 092 case UPDATE_BUNDLE_JOB_STATUS_PENDING: 093 query.setParameter("status", bjBean.getStatus().toString()); 094 query.setParameter("pending", bjBean.getPending()); 095 query.setParameter("id", bjBean.getId()); 096 break; 097 case UPDATE_BUNDLE_JOB_STATUS_PENDING_SUSP_MOD_TIME: 098 query.setParameter("status", bjBean.getStatus().toString()); 099 query.setParameter("lastModifiedTime", bjBean.getLastModifiedTimestamp()); 100 query.setParameter("pending", bjBean.getPending()); 101 query.setParameter("suspendedTime", bjBean.getSuspendedTimestamp()); 102 query.setParameter("id", bjBean.getId()); 103 break; 104 case UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME: 105 query.setParameter("status", bjBean.getStatus().toString()); 106 query.setParameter("lastModifiedTime", bjBean.getLastModifiedTimestamp()); 107 query.setParameter("pending", bjBean.getPending()); 108 query.setParameter("id", bjBean.getId()); 109 break; 110 case UPDATE_BUNDLE_JOB_STATUS_PAUSE_ENDTIME: 111 query.setParameter("status", bjBean.getStatus().toString()); 112 query.setParameter("pauseTime", bjBean.getPauseTimestamp()); 113 query.setParameter("endTime", bjBean.getEndTimestamp()); 114 query.setParameter("id", bjBean.getId()); 115 break; 116 case UPDATE_BUNDLE_JOB_PAUSE_KICKOFF: 117 query.setParameter("pauseTime", bjBean.getPauseTimestamp()); 118 query.setParameter("kickoffTime", bjBean.getKickoffTimestamp()); 119 query.setParameter("id", bjBean.getId()); 120 break; 121 default: 122 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 123 + namedQuery.name()); 124 } 125 return query; 126 } 127 128 @Override 129 public Query getSelectQuery(BundleJobQuery namedQuery, EntityManager em, Object... parameters) 130 throws JPAExecutorException { 131 Query query = em.createNamedQuery(namedQuery.name()); 132 switch (namedQuery) { 133 case GET_BUNDLE_JOB: 134 case GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME: 135 case GET_BUNDLE_JOB_ID_JOBXML_CONF: 136 case GET_BUNDLE_JOB_STATUS: 137 query.setParameter("id", parameters[0]); 138 break; 139 case GET_BUNDLE_IDS_FOR_STATUS_TRANSIT: 140 query.setParameter("lastModifiedTime", DateUtils.convertDateToTimestamp((Date)parameters[0])); 141 break; 142 default: 143 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 144 + namedQuery.name()); 145 } 146 return query; 147 } 148 149 @Override 150 public int executeUpdate(BundleJobQuery namedQuery, BundleJobBean jobBean) throws JPAExecutorException { 151 JPAService jpaService = Services.get().get(JPAService.class); 152 EntityManager em = jpaService.getEntityManager(); 153 Query query = getUpdateQuery(namedQuery, jobBean, em); 154 int ret = jpaService.executeUpdate(namedQuery.name(), query, em); 155 return ret; 156 } 157 158 @Override 159 public BundleJobBean get(BundleJobQuery namedQuery, Object... parameters) throws JPAExecutorException { 160 BundleJobBean bean = getIfExist(namedQuery, parameters); 161 if (bean == null) { 162 throw new JPAExecutorException(ErrorCode.E0605, getSelectQuery(namedQuery, 163 Services.get().get(JPAService.class).getEntityManager(), parameters).toString()); 164 } 165 return bean; 166 } 167 168 @Override 169 public BundleJobBean getIfExist(BundleJobQuery namedQuery, Object... parameters) throws JPAExecutorException { 170 JPAService jpaService = Services.get().get(JPAService.class); 171 EntityManager em = jpaService.getEntityManager(); 172 Query query = getSelectQuery(namedQuery, em, parameters); 173 Object ret = jpaService.executeGet(namedQuery.name(), query, em); 174 if (ret == null) { 175 return null; 176 } 177 BundleJobBean bean = constructBean(namedQuery, ret, parameters); 178 return bean; 179 } 180 181 @Override 182 public List<BundleJobBean> getList(BundleJobQuery namedQuery, Object... parameters) throws JPAExecutorException { 183 JPAService jpaService = Services.get().get(JPAService.class); 184 EntityManager em = jpaService.getEntityManager(); 185 Query query = getSelectQuery(namedQuery, em, parameters); 186 List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em); 187 List<BundleJobBean> beanList = new ArrayList<BundleJobBean>(); 188 if (retList != null) { 189 for (Object ret : retList) { 190 beanList.add(constructBean(namedQuery, ret)); 191 } 192 } 193 return beanList; 194 } 195 196 private BundleJobBean constructBean(BundleJobQuery namedQuery, Object ret, Object... parameters) 197 throws JPAExecutorException { 198 BundleJobBean bean; 199 Object[] arr; 200 switch (namedQuery) { 201 case GET_BUNDLE_JOB: 202 bean = (BundleJobBean) ret; 203 break; 204 case GET_BUNDLE_JOB_STATUS: 205 bean = new BundleJobBean(); 206 bean.setId((String) parameters[0]); 207 bean.setStatus((String) ret); 208 break; 209 case GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME: 210 bean = new BundleJobBean(); 211 arr = (Object[]) ret; 212 bean.setId((String) arr[0]); 213 bean.setStatus((String) arr[1]); 214 bean.setPending((Integer) arr[2]); 215 bean.setLastModifiedTime(DateUtils.toDate((Timestamp) arr[3])); 216 bean.setPauseTime(DateUtils.toDate((Timestamp) arr[4])); 217 bean.setSuspendedTime(DateUtils.toDate((Timestamp) arr[5])); 218 219 break; 220 case GET_BUNDLE_JOB_ID_JOBXML_CONF: 221 bean = new BundleJobBean(); 222 arr = (Object[]) ret; 223 bean.setId((String) arr[0]); 224 bean.setJobXmlBlob((StringBlob) arr[1]); 225 bean.setConfBlob((StringBlob) arr[2]); 226 break; 227 228 case GET_BUNDLE_IDS_FOR_STATUS_TRANSIT: 229 bean = new BundleJobBean(); 230 bean.setId((String) ret); 231 break; 232 233 default: 234 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for " 235 + namedQuery.name()); 236 } 237 return bean; 238 } 239 240 @Override 241 public Object getSingleValue(BundleJobQuery namedQuery, Object... parameters) throws JPAExecutorException { 242 throw new UnsupportedOperationException(); 243 } 244}