001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.executor.jpa; 019 020import java.sql.Timestamp; 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.List; 024 025import javax.persistence.EntityManager; 026import javax.persistence.Query; 027 028import org.apache.oozie.CoordinatorJobBean; 029import org.apache.oozie.ErrorCode; 030import org.apache.oozie.StringBlob; 031import org.apache.oozie.service.JPAService; 032import org.apache.oozie.service.Services; 033import org.apache.oozie.util.DateUtils; 034 035/** 036 * Query Executor that provides API to run query for Coordinator Job 037 */ 038 039public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, CoordJobQueryExecutor.CoordJobQuery> { 040 041 public enum CoordJobQuery { 042 UPDATE_COORD_JOB, 043 UPDATE_COORD_JOB_STATUS, 044 UPDATE_COORD_JOB_BUNDLEID, 045 UPDATE_COORD_JOB_APPNAMESPACE, 046 UPDATE_COORD_JOB_STATUS_PENDING, 047 UPDATE_COORD_JOB_BUNDLEID_APPNAMESPACE_PAUSETIME, 048 UPDATE_COORD_JOB_STATUS_MODTIME, 049 UPDATE_COORD_JOB_STATUS_PENDING_MODTIME, 050 UPDATE_COORD_JOB_LAST_MODIFIED_TIME, 051 UPDATE_COORD_JOB_STATUS_PENDING_TIME, 052 UPDATE_COORD_JOB_MATERIALIZE, 053 UPDATE_COORD_JOB_CHANGE, 054 GET_COORD_JOB, 055 GET_COORD_JOB_USER_APPNAME, 056 GET_COORD_JOB_INPUT_CHECK, 057 GET_COORD_JOB_ACTION_READY, 058 GET_COORD_JOB_ACTION_KILL, 059 GET_COORD_JOB_MATERIALIZE, 060 GET_COORD_JOB_SUSPEND_KILL, 061 GET_COORD_JOB_STATUS_PARENTID, 062 GET_COORD_JOBS_CHANGED, 063 GET_COORD_JOBS_OLDER_FOR_MATERILZATION 064 }; 065 066 private static CoordJobQueryExecutor instance = new CoordJobQueryExecutor(); 067 068 private CoordJobQueryExecutor() { 069 } 070 071 public static CoordJobQueryExecutor getInstance() { 072 return CoordJobQueryExecutor.instance; 073 } 074 075 @Override 076 public Query getUpdateQuery(CoordJobQuery namedQuery, CoordinatorJobBean cjBean, EntityManager em) 077 throws JPAExecutorException { 078 Query query = em.createNamedQuery(namedQuery.name()); 079 switch (namedQuery) { 080 case UPDATE_COORD_JOB: 081 query.setParameter("appName", cjBean.getAppName()); 082 query.setParameter("appPath", cjBean.getAppPath()); 083 query.setParameter("concurrency", cjBean.getConcurrency()); 084 query.setParameter("conf", cjBean.getConfBlob()); 085 query.setParameter("externalId", cjBean.getExternalId()); 086 query.setParameter("frequency", cjBean.getFrequency()); 087 query.setParameter("lastActionNumber", cjBean.getLastActionNumber()); 088 query.setParameter("timeOut", cjBean.getTimeout()); 089 query.setParameter("timeZone", cjBean.getTimeZone()); 090 query.setParameter("createdTime", cjBean.getCreatedTimestamp()); 091 query.setParameter("endTime", cjBean.getEndTimestamp()); 092 query.setParameter("execution", cjBean.getExecution()); 093 query.setParameter("jobXml", cjBean.getJobXmlBlob()); 094 query.setParameter("lastAction", cjBean.getLastActionTimestamp()); 095 query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp()); 096 query.setParameter("nextMaterializedTime", cjBean.getNextMaterializedTimestamp()); 097 query.setParameter("origJobXml", cjBean.getOrigJobXmlBlob()); 098 query.setParameter("slaXml", cjBean.getSlaXmlBlob()); 099 query.setParameter("startTime", cjBean.getStartTimestamp()); 100 query.setParameter("status", cjBean.getStatus().toString()); 101 query.setParameter("timeUnit", cjBean.getTimeUnitStr()); 102 query.setParameter("appNamespace", cjBean.getAppNamespace()); 103 query.setParameter("bundleId", cjBean.getBundleId()); 104 query.setParameter("matThrottling", cjBean.getMatThrottling()); 105 query.setParameter("id", cjBean.getId()); 106 break; 107 case UPDATE_COORD_JOB_STATUS: 108 query.setParameter("status", cjBean.getStatus().toString()); 109 query.setParameter("id", cjBean.getId()); 110 break; 111 case UPDATE_COORD_JOB_BUNDLEID: 112 query.setParameter("bundleId", cjBean.getBundleId()); 113 query.setParameter("id", cjBean.getId()); 114 break; 115 case UPDATE_COORD_JOB_APPNAMESPACE: 116 query.setParameter("appNamespace", cjBean.getAppNamespace()); 117 query.setParameter("id", cjBean.getId()); 118 break; 119 case UPDATE_COORD_JOB_STATUS_PENDING: 120 query.setParameter("status", cjBean.getStatus().toString()); 121 query.setParameter("pending", cjBean.isPending() ? 1 : 0); 122 query.setParameter("id", cjBean.getId()); 123 break; 124 case UPDATE_COORD_JOB_BUNDLEID_APPNAMESPACE_PAUSETIME: 125 query.setParameter("bundleId", cjBean.getBundleId()); 126 query.setParameter("appNamespace", cjBean.getAppNamespace()); 127 query.setParameter("pauseTime", cjBean.getPauseTimestamp()); 128 query.setParameter("id", cjBean.getId()); 129 break; 130 case UPDATE_COORD_JOB_STATUS_MODTIME: 131 query.setParameter("status", cjBean.getStatus().toString()); 132 query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp()); 133 query.setParameter("id", cjBean.getId()); 134 break; 135 case UPDATE_COORD_JOB_STATUS_PENDING_MODTIME: 136 query.setParameter("status", cjBean.getStatus().toString()); 137 query.setParameter("pending", cjBean.isPending() ? 1 : 0); 138 query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp()); 139 query.setParameter("id", cjBean.getId()); 140 break; 141 case UPDATE_COORD_JOB_LAST_MODIFIED_TIME: 142 query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp()); 143 query.setParameter("id", cjBean.getId()); 144 break; 145 case UPDATE_COORD_JOB_STATUS_PENDING_TIME: 146 query.setParameter("status", cjBean.getStatus().toString()); 147 query.setParameter("pending", cjBean.isPending() ? 1 : 0); 148 query.setParameter("doneMaterialization", cjBean.isDoneMaterialization() ? 1 : 0); 149 query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp()); 150 query.setParameter("suspendedTime", cjBean.getSuspendedTimestamp()); 151 query.setParameter("id", cjBean.getId()); 152 break; 153 case UPDATE_COORD_JOB_MATERIALIZE: 154 query.setParameter("status", cjBean.getStatus().toString()); 155 query.setParameter("pending", cjBean.isPending() ? 1 : 0); 156 query.setParameter("doneMaterialization", cjBean.isDoneMaterialization() ? 1 : 0); 157 query.setParameter("lastActionTime", cjBean.getLastActionTimestamp()); 158 query.setParameter("lastActionNumber", cjBean.getLastActionNumber()); 159 query.setParameter("nextMatdTime", cjBean.getNextMaterializedTimestamp()); 160 query.setParameter("id", cjBean.getId()); 161 break; 162 case UPDATE_COORD_JOB_CHANGE: 163 query.setParameter("endTime", cjBean.getEndTimestamp()); 164 query.setParameter("status", cjBean.getStatus().toString()); 165 query.setParameter("pending", cjBean.isPending() ? 1 : 0); 166 query.setParameter("doneMaterialization", cjBean.isDoneMaterialization() ? 1 : 0); 167 query.setParameter("concurrency", cjBean.getConcurrency()); 168 query.setParameter("pauseTime", cjBean.getPauseTimestamp()); 169 query.setParameter("lastActionNumber", cjBean.getLastActionNumber()); 170 query.setParameter("lastActionTime", cjBean.getLastActionTimestamp()); 171 query.setParameter("nextMatdTime", cjBean.getNextMaterializedTimestamp()); 172 query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp()); 173 query.setParameter("id", cjBean.getId()); 174 break; 175 default: 176 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 177 + namedQuery.name()); 178 } 179 return query; 180 } 181 182 @Override 183 public Query getSelectQuery(CoordJobQuery namedQuery, EntityManager em, Object... parameters) 184 throws JPAExecutorException { 185 Query query = em.createNamedQuery(namedQuery.name()); 186 switch (namedQuery) { 187 case GET_COORD_JOB: 188 case GET_COORD_JOB_USER_APPNAME: 189 case GET_COORD_JOB_INPUT_CHECK: 190 case GET_COORD_JOB_ACTION_READY: 191 case GET_COORD_JOB_ACTION_KILL: 192 case GET_COORD_JOB_MATERIALIZE: 193 case GET_COORD_JOB_SUSPEND_KILL: 194 case GET_COORD_JOB_STATUS_PARENTID: 195 query.setParameter("id", parameters[0]); 196 break; 197 case GET_COORD_JOBS_CHANGED: 198 query.setParameter("lastModifiedTime", new Timestamp(((Date)parameters[0]).getTime())); 199 break; 200 case GET_COORD_JOBS_OLDER_FOR_MATERILZATION: 201 query.setParameter("matTime", new Timestamp(((Date)parameters[0]).getTime())); 202 int limit = (Integer) parameters[1]; 203 if (limit > 0) { 204 query.setMaxResults(limit); 205 } 206 break; 207 208 default: 209 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 210 + namedQuery.name()); 211 } 212 return query; 213 } 214 215 @Override 216 public int executeUpdate(CoordJobQuery namedQuery, CoordinatorJobBean jobBean) throws JPAExecutorException { 217 JPAService jpaService = Services.get().get(JPAService.class); 218 EntityManager em = jpaService.getEntityManager(); 219 Query query = getUpdateQuery(namedQuery, jobBean, em); 220 int ret = jpaService.executeUpdate(namedQuery.name(), query, em); 221 return ret; 222 } 223 224 private CoordinatorJobBean constructBean(CoordJobQuery namedQuery, Object ret, Object... parameters) 225 throws JPAExecutorException { 226 CoordinatorJobBean bean; 227 Object[] arr; 228 switch (namedQuery) { 229 case GET_COORD_JOB: 230 bean = (CoordinatorJobBean) ret; 231 break; 232 case GET_COORD_JOB_USER_APPNAME: 233 bean = new CoordinatorJobBean(); 234 arr = (Object[]) ret; 235 bean.setUser((String) arr[0]); 236 bean.setAppName((String) arr[1]); 237 break; 238 case GET_COORD_JOB_INPUT_CHECK: 239 bean = new CoordinatorJobBean(); 240 arr = (Object[]) ret; 241 bean.setUser((String) arr[0]); 242 bean.setAppName((String) arr[1]); 243 bean.setStatusStr((String) arr[2]); 244 bean.setAppNamespace((String) arr[3]); 245 bean.setExecution((String) arr[4]); 246 bean.setFrequency((String) arr[5]); 247 bean.setTimeUnitStr((String) arr[6]); 248 bean.setTimeZone((String) arr[7]); 249 bean.setEndTime(DateUtils.toDate((Timestamp) arr[8])); 250 break; 251 case GET_COORD_JOB_ACTION_READY: 252 bean = new CoordinatorJobBean(); 253 arr = (Object[]) ret; 254 bean.setId((String) arr[0]); 255 bean.setUser((String) arr[1]); 256 bean.setGroup((String) arr[2]); 257 bean.setAppName((String) arr[3]); 258 bean.setStatusStr((String) arr[4]); 259 bean.setExecution((String) arr[5]); 260 bean.setConcurrency((Integer) arr[6]); 261 break; 262 case GET_COORD_JOB_ACTION_KILL: 263 bean = new CoordinatorJobBean(); 264 arr = (Object[]) ret; 265 bean.setId((String) arr[0]); 266 bean.setUser((String) arr[1]); 267 bean.setGroup((String) arr[2]); 268 bean.setAppName((String) arr[3]); 269 bean.setStatusStr((String) arr[4]); 270 break; 271 case GET_COORD_JOB_MATERIALIZE: 272 bean = new CoordinatorJobBean(); 273 arr = (Object[]) ret; 274 bean.setId((String) arr[0]); 275 bean.setUser((String) arr[1]); 276 bean.setGroup((String) arr[2]); 277 bean.setAppName((String) arr[3]); 278 bean.setStatusStr((String) arr[4]); 279 bean.setFrequency((String) arr[5]); 280 bean.setMatThrottling((Integer) arr[6]); 281 bean.setTimeout((Integer) arr[7]); 282 bean.setTimeZone((String) arr[8]); 283 bean.setStartTime(DateUtils.toDate((Timestamp) arr[9])); 284 bean.setEndTime(DateUtils.toDate((Timestamp) arr[10])); 285 bean.setPauseTime(DateUtils.toDate((Timestamp) arr[11])); 286 bean.setNextMaterializedTime(DateUtils.toDate((Timestamp) arr[12])); 287 bean.setLastActionTime(DateUtils.toDate((Timestamp) arr[13])); 288 bean.setLastActionNumber((Integer) arr[14]); 289 bean.setDoneMaterialization((Integer) arr[15]); 290 bean.setBundleId((String) arr[16]); 291 bean.setConfBlob((StringBlob) arr[17]); 292 bean.setJobXmlBlob((StringBlob) arr[18]); 293 bean.setAppNamespace((String) arr[19]); 294 bean.setTimeUnitStr((String) arr[20]); 295 bean.setExecution((String) arr[21]); 296 break; 297 case GET_COORD_JOB_SUSPEND_KILL: 298 bean = new CoordinatorJobBean(); 299 arr = (Object[]) ret; 300 bean.setId((String) arr[0]); 301 bean.setUser((String) arr[1]); 302 bean.setGroup((String) arr[2]); 303 bean.setAppName((String) arr[3]); 304 bean.setStatusStr((String) arr[4]); 305 bean.setBundleId((String) arr[5]); 306 bean.setAppNamespace((String) arr[6]); 307 bean.setDoneMaterialization((Integer) arr[7]); 308 break; 309 case GET_COORD_JOB_STATUS_PARENTID: 310 bean = new CoordinatorJobBean(); 311 arr = (Object[]) ret; 312 bean.setId((String) parameters[0]); 313 bean.setStatusStr((String) arr[0]); 314 bean.setBundleId((String) arr[1]); 315 break; 316 case GET_COORD_JOBS_CHANGED: 317 bean = (CoordinatorJobBean) ret; 318 break; 319 case GET_COORD_JOBS_OLDER_FOR_MATERILZATION: 320 bean = new CoordinatorJobBean(); 321 bean.setId((String) ret); 322 break; 323 default: 324 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for " 325 + namedQuery.name()); 326 } 327 return bean; 328 } 329 330 @Override 331 public CoordinatorJobBean get(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException { 332 JPAService jpaService = Services.get().get(JPAService.class); 333 EntityManager em = jpaService.getEntityManager(); 334 Query query = getSelectQuery(namedQuery, em, parameters); 335 Object ret = jpaService.executeGet(namedQuery.name(), query, em); 336 if (ret == null) { 337 throw new JPAExecutorException(ErrorCode.E0604, query.toString()); 338 } 339 CoordinatorJobBean bean = constructBean(namedQuery, ret, parameters); 340 return bean; 341 } 342 343 @Override 344 public List<CoordinatorJobBean> getList(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException { 345 JPAService jpaService = Services.get().get(JPAService.class); 346 EntityManager em = jpaService.getEntityManager(); 347 Query query = getSelectQuery(namedQuery, em, parameters); 348 List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em); 349 List<CoordinatorJobBean> beanList = new ArrayList<CoordinatorJobBean>(); 350 if (retList != null) { 351 for (Object ret : retList) { 352 beanList.add(constructBean(namedQuery, ret, parameters)); 353 } 354 } 355 return beanList; 356 } 357 358 @Override 359 public Object getSingleValue(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException { 360 throw new UnsupportedOperationException(); 361 } 362 363}