001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.executor.jpa; 020 021import java.sql.Timestamp; 022import java.util.ArrayList; 023import java.util.Date; 024import java.util.List; 025 026import javax.persistence.EntityManager; 027import javax.persistence.Query; 028 029import org.apache.oozie.CoordinatorJobBean; 030import org.apache.oozie.ErrorCode; 031import org.apache.oozie.StringBlob; 032import org.apache.oozie.service.JPAService; 033import org.apache.oozie.service.Services; 034import org.apache.oozie.util.DateUtils; 035 036/** 037 * Query Executor that provides API to run query for Coordinator Job 038 */ 039 040public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, CoordJobQueryExecutor.CoordJobQuery> { 041 042 public enum CoordJobQuery { 043 UPDATE_COORD_JOB, 044 UPDATE_COORD_JOB_STATUS, 045 UPDATE_COORD_JOB_BUNDLEID, 046 UPDATE_COORD_JOB_APPNAMESPACE, 047 UPDATE_COORD_JOB_STATUS_PENDING, 048 UPDATE_COORD_JOB_BUNDLEID_APPNAMESPACE_PAUSETIME, 049 UPDATE_COORD_JOB_STATUS_MODTIME, 050 UPDATE_COORD_JOB_STATUS_PENDING_MODTIME, 051 UPDATE_COORD_JOB_LAST_MODIFIED_TIME, 052 UPDATE_COORD_JOB_STATUS_PENDING_TIME, 053 UPDATE_COORD_JOB_MATERIALIZE, 054 UPDATE_COORD_JOB_CHANGE, 055 UPDATE_COORD_JOB_CONF, 056 UPDATE_COORD_JOB_XML, 057 GET_COORD_JOB, 058 GET_COORD_JOB_USER_APPNAME, 059 GET_COORD_JOB_INPUT_CHECK, 060 GET_COORD_JOB_ACTION_READY, 061 GET_COORD_JOB_ACTION_KILL, 062 GET_COORD_JOB_MATERIALIZE, 063 GET_COORD_JOB_SUSPEND_KILL, 064 GET_COORD_JOB_STATUS, 065 GET_COORD_JOB_STATUS_PARENTID, 066 GET_COORD_JOBS_CHANGED, 067 GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION, 068 GET_COORD_FOR_ABANDONEDCHECK, 069 GET_COORD_IDS_FOR_STATUS_TRANSIT, 070 GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID, 071 GET_COORD_JOBS_WITH_PARENT_ID, 072 GET_COORD_JOB_CONF, 073 GET_COORD_JOB_XML 074 }; 075 076 private static CoordJobQueryExecutor instance = new CoordJobQueryExecutor(); 077 078 private CoordJobQueryExecutor() { 079 } 080 081 public static CoordJobQueryExecutor getInstance() { 082 return CoordJobQueryExecutor.instance; 083 } 084 085 @Override 086 public Query getUpdateQuery(CoordJobQuery namedQuery, CoordinatorJobBean cjBean, EntityManager em) 087 throws JPAExecutorException { 088 Query query = em.createNamedQuery(namedQuery.name()); 089 switch (namedQuery) { 090 case UPDATE_COORD_JOB: 091 query.setParameter("appName", cjBean.getAppName()); 092 query.setParameter("appPath", cjBean.getAppPath()); 093 query.setParameter("concurrency", cjBean.getConcurrency()); 094 query.setParameter("conf", cjBean.getConfBlob()); 095 query.setParameter("externalId", cjBean.getExternalId()); 096 query.setParameter("frequency", cjBean.getFrequency()); 097 query.setParameter("lastActionNumber", cjBean.getLastActionNumber()); 098 query.setParameter("timeOut", cjBean.getTimeout()); 099 query.setParameter("timeZone", cjBean.getTimeZone()); 100 query.setParameter("createdTime", cjBean.getCreatedTimestamp()); 101 query.setParameter("endTime", cjBean.getEndTimestamp()); 102 query.setParameter("execution", cjBean.getExecution()); 103 query.setParameter("jobXml", cjBean.getJobXmlBlob()); 104 query.setParameter("lastAction", cjBean.getLastActionTimestamp()); 105 query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp()); 106 query.setParameter("nextMaterializedTime", cjBean.getNextMaterializedTimestamp()); 107 query.setParameter("origJobXml", cjBean.getOrigJobXmlBlob()); 108 query.setParameter("slaXml", cjBean.getSlaXmlBlob()); 109 query.setParameter("startTime", cjBean.getStartTimestamp()); 110 query.setParameter("status", cjBean.getStatus().toString()); 111 query.setParameter("timeUnit", cjBean.getTimeUnitStr()); 112 query.setParameter("appNamespace", cjBean.getAppNamespace()); 113 query.setParameter("bundleId", cjBean.getBundleId()); 114 query.setParameter("matThrottling", cjBean.getMatThrottling()); 115 query.setParameter("id", cjBean.getId()); 116 break; 117 case UPDATE_COORD_JOB_STATUS: 118 query.setParameter("status", cjBean.getStatus().toString()); 119 query.setParameter("id", cjBean.getId()); 120 break; 121 case UPDATE_COORD_JOB_BUNDLEID: 122 query.setParameter("bundleId", cjBean.getBundleId()); 123 query.setParameter("id", cjBean.getId()); 124 break; 125 case UPDATE_COORD_JOB_APPNAMESPACE: 126 query.setParameter("appNamespace", cjBean.getAppNamespace()); 127 query.setParameter("id", cjBean.getId()); 128 break; 129 case UPDATE_COORD_JOB_STATUS_PENDING: 130 query.setParameter("status", cjBean.getStatus().toString()); 131 query.setParameter("pending", cjBean.isPending() ? 1 : 0); 132 query.setParameter("id", cjBean.getId()); 133 break; 134 case UPDATE_COORD_JOB_BUNDLEID_APPNAMESPACE_PAUSETIME: 135 query.setParameter("bundleId", cjBean.getBundleId()); 136 query.setParameter("appNamespace", cjBean.getAppNamespace()); 137 query.setParameter("pauseTime", cjBean.getPauseTimestamp()); 138 query.setParameter("id", cjBean.getId()); 139 break; 140 case UPDATE_COORD_JOB_STATUS_MODTIME: 141 query.setParameter("status", cjBean.getStatus().toString()); 142 query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp()); 143 query.setParameter("id", cjBean.getId()); 144 break; 145 case UPDATE_COORD_JOB_STATUS_PENDING_MODTIME: 146 query.setParameter("status", cjBean.getStatus().toString()); 147 query.setParameter("pending", cjBean.isPending() ? 1 : 0); 148 query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp()); 149 query.setParameter("id", cjBean.getId()); 150 break; 151 case UPDATE_COORD_JOB_LAST_MODIFIED_TIME: 152 query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp()); 153 query.setParameter("id", cjBean.getId()); 154 break; 155 case UPDATE_COORD_JOB_STATUS_PENDING_TIME: 156 query.setParameter("status", cjBean.getStatus().toString()); 157 query.setParameter("pending", cjBean.isPending() ? 1 : 0); 158 query.setParameter("doneMaterialization", cjBean.isDoneMaterialization() ? 1 : 0); 159 query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp()); 160 query.setParameter("suspendedTime", cjBean.getSuspendedTimestamp()); 161 query.setParameter("id", cjBean.getId()); 162 break; 163 case UPDATE_COORD_JOB_MATERIALIZE: 164 query.setParameter("status", cjBean.getStatus().toString()); 165 query.setParameter("pending", cjBean.isPending() ? 1 : 0); 166 query.setParameter("doneMaterialization", cjBean.isDoneMaterialization() ? 1 : 0); 167 query.setParameter("lastActionTime", cjBean.getLastActionTimestamp()); 168 query.setParameter("lastActionNumber", cjBean.getLastActionNumber()); 169 query.setParameter("nextMatdTime", cjBean.getNextMaterializedTimestamp()); 170 query.setParameter("id", cjBean.getId()); 171 break; 172 case UPDATE_COORD_JOB_CHANGE: 173 query.setParameter("endTime", cjBean.getEndTimestamp()); 174 query.setParameter("status", cjBean.getStatus().toString()); 175 query.setParameter("pending", cjBean.isPending() ? 1 : 0); 176 query.setParameter("doneMaterialization", cjBean.isDoneMaterialization() ? 1 : 0); 177 query.setParameter("concurrency", cjBean.getConcurrency()); 178 query.setParameter("pauseTime", cjBean.getPauseTimestamp()); 179 query.setParameter("lastActionNumber", cjBean.getLastActionNumber()); 180 query.setParameter("lastActionTime", cjBean.getLastActionTimestamp()); 181 query.setParameter("nextMatdTime", cjBean.getNextMaterializedTimestamp()); 182 query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp()); 183 query.setParameter("id", cjBean.getId()); 184 break; 185 case UPDATE_COORD_JOB_CONF: 186 query.setParameter("conf", cjBean.getConfBlob()); 187 query.setParameter("id", cjBean.getId()); 188 break; 189 case UPDATE_COORD_JOB_XML: 190 query.setParameter("jobXml", cjBean.getJobXmlBlob()); 191 query.setParameter("id", cjBean.getId()); 192 break; 193 194 default: 195 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 196 + namedQuery.name()); 197 } 198 return query; 199 } 200 201 @Override 202 public Query getSelectQuery(CoordJobQuery namedQuery, EntityManager em, Object... parameters) 203 throws JPAExecutorException { 204 Query query = em.createNamedQuery(namedQuery.name()); 205 switch (namedQuery) { 206 case GET_COORD_JOB: 207 case GET_COORD_JOB_USER_APPNAME: 208 case GET_COORD_JOB_INPUT_CHECK: 209 case GET_COORD_JOB_ACTION_READY: 210 case GET_COORD_JOB_ACTION_KILL: 211 case GET_COORD_JOB_MATERIALIZE: 212 case GET_COORD_JOB_SUSPEND_KILL: 213 case GET_COORD_JOB_STATUS: 214 case GET_COORD_JOB_STATUS_PARENTID: 215 case GET_COORD_JOB_CONF: 216 case GET_COORD_JOB_XML: 217 query.setParameter("id", parameters[0]); 218 break; 219 case GET_COORD_JOBS_CHANGED: 220 query.setParameter("lastModifiedTime", new Timestamp(((Date)parameters[0]).getTime())); 221 break; 222 case GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION: 223 query.setParameter("matTime", new Timestamp(((Date)parameters[0]).getTime())); 224 int limit = (Integer) parameters[1]; 225 if (limit > 0) { 226 query.setMaxResults(limit); 227 } 228 break; 229 case GET_COORD_FOR_ABANDONEDCHECK: 230 query.setParameter(1, (Integer) parameters[0]); 231 query.setParameter(2, (Timestamp) parameters[1]); 232 break; 233 234 case GET_COORD_IDS_FOR_STATUS_TRANSIT: 235 query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime())); 236 break; 237 case GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID: 238 query.setParameter("appName", parameters[0]); 239 query.setParameter("bundleId", parameters[1]); 240 break; 241 case GET_COORD_JOBS_WITH_PARENT_ID: 242 query.setParameter("parentId", parameters[0]); 243 break; 244 default: 245 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 246 + namedQuery.name()); 247 } 248 return query; 249 } 250 251 @Override 252 public int executeUpdate(CoordJobQuery namedQuery, CoordinatorJobBean jobBean) throws JPAExecutorException { 253 JPAService jpaService = Services.get().get(JPAService.class); 254 EntityManager em = jpaService.getEntityManager(); 255 Query query = getUpdateQuery(namedQuery, jobBean, em); 256 int ret = jpaService.executeUpdate(namedQuery.name(), query, em); 257 return ret; 258 } 259 260 private CoordinatorJobBean constructBean(CoordJobQuery namedQuery, Object ret, Object... parameters) 261 throws JPAExecutorException { 262 CoordinatorJobBean bean; 263 Object[] arr; 264 switch (namedQuery) { 265 case GET_COORD_JOB: 266 bean = (CoordinatorJobBean) ret; 267 break; 268 case GET_COORD_JOB_USER_APPNAME: 269 bean = new CoordinatorJobBean(); 270 arr = (Object[]) ret; 271 bean.setUser((String) arr[0]); 272 bean.setAppName((String) arr[1]); 273 break; 274 case GET_COORD_JOB_INPUT_CHECK: 275 bean = new CoordinatorJobBean(); 276 arr = (Object[]) ret; 277 bean.setUser((String) arr[0]); 278 bean.setAppName((String) arr[1]); 279 bean.setStatusStr((String) arr[2]); 280 bean.setAppNamespace((String) arr[3]); 281 bean.setExecution((String) arr[4]); 282 bean.setFrequency((String) arr[5]); 283 bean.setTimeUnitStr((String) arr[6]); 284 bean.setTimeZone((String) arr[7]); 285 bean.setStartTime(DateUtils.toDate((Timestamp) arr[8])); 286 bean.setEndTime(DateUtils.toDate((Timestamp) arr[9])); 287 bean.setJobXmlBlob((StringBlob) arr[10]); 288 break; 289 case GET_COORD_JOB_ACTION_READY: 290 bean = new CoordinatorJobBean(); 291 arr = (Object[]) ret; 292 bean.setId((String) arr[0]); 293 bean.setUser((String) arr[1]); 294 bean.setGroup((String) arr[2]); 295 bean.setAppName((String) arr[3]); 296 bean.setStatusStr((String) arr[4]); 297 bean.setExecution((String) arr[5]); 298 bean.setConcurrency((Integer) arr[6]); 299 bean.setFrequency((String) arr[7]); 300 bean.setTimeUnitStr((String) arr[8]); 301 bean.setTimeZone((String) arr[9]); 302 bean.setStartTime(DateUtils.toDate((Timestamp) arr[10])); 303 bean.setEndTime(DateUtils.toDate((Timestamp) arr[11])); 304 bean.setJobXmlBlob((StringBlob) arr[12]); 305 break; 306 case GET_COORD_JOB_ACTION_KILL: 307 bean = new CoordinatorJobBean(); 308 arr = (Object[]) ret; 309 bean.setId((String) arr[0]); 310 bean.setUser((String) arr[1]); 311 bean.setGroup((String) arr[2]); 312 bean.setAppName((String) arr[3]); 313 bean.setStatusStr((String) arr[4]); 314 break; 315 case GET_COORD_JOB_MATERIALIZE: 316 bean = new CoordinatorJobBean(); 317 arr = (Object[]) ret; 318 bean.setId((String) arr[0]); 319 bean.setUser((String) arr[1]); 320 bean.setGroup((String) arr[2]); 321 bean.setAppName((String) arr[3]); 322 bean.setStatusStr((String) arr[4]); 323 bean.setFrequency((String) arr[5]); 324 bean.setMatThrottling((Integer) arr[6]); 325 bean.setTimeout((Integer) arr[7]); 326 bean.setTimeZone((String) arr[8]); 327 bean.setStartTime(DateUtils.toDate((Timestamp) arr[9])); 328 bean.setEndTime(DateUtils.toDate((Timestamp) arr[10])); 329 bean.setPauseTime(DateUtils.toDate((Timestamp) arr[11])); 330 bean.setNextMaterializedTime(DateUtils.toDate((Timestamp) arr[12])); 331 bean.setLastActionTime(DateUtils.toDate((Timestamp) arr[13])); 332 bean.setLastActionNumber((Integer) arr[14]); 333 bean.setDoneMaterialization((Integer) arr[15]); 334 bean.setBundleId((String) arr[16]); 335 bean.setConfBlob((StringBlob) arr[17]); 336 bean.setJobXmlBlob((StringBlob) arr[18]); 337 bean.setAppNamespace((String) arr[19]); 338 bean.setTimeUnitStr((String) arr[20]); 339 bean.setExecution((String) arr[21]); 340 break; 341 case GET_COORD_JOB_SUSPEND_KILL: 342 bean = new CoordinatorJobBean(); 343 arr = (Object[]) ret; 344 bean.setId((String) arr[0]); 345 bean.setUser((String) arr[1]); 346 bean.setGroup((String) arr[2]); 347 bean.setAppName((String) arr[3]); 348 bean.setStatusStr((String) arr[4]); 349 bean.setBundleId((String) arr[5]); 350 bean.setAppNamespace((String) arr[6]); 351 bean.setDoneMaterialization((Integer) arr[7]); 352 break; 353 case GET_COORD_JOB_STATUS: 354 bean = new CoordinatorJobBean(); 355 bean.setId((String) parameters[0]); 356 bean.setStatusStr((String) ret); 357 break; 358 case GET_COORD_JOB_STATUS_PARENTID: 359 bean = new CoordinatorJobBean(); 360 arr = (Object[]) ret; 361 bean.setId((String) parameters[0]); 362 bean.setStatusStr((String) arr[0]); 363 bean.setBundleId((String) arr[1]); 364 break; 365 case GET_COORD_JOBS_CHANGED: 366 bean = (CoordinatorJobBean) ret; 367 break; 368 case GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION: 369 bean = new CoordinatorJobBean(); 370 bean.setId((String) ret); 371 break; 372 case GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID: 373 bean = new CoordinatorJobBean(); 374 bean.setId((String) ret); 375 break; 376 case GET_COORD_JOBS_WITH_PARENT_ID: 377 bean = new CoordinatorJobBean(); 378 bean.setId((String) ret); 379 break; 380 case GET_COORD_FOR_ABANDONEDCHECK: 381 bean = new CoordinatorJobBean(); 382 arr = (Object[]) ret; 383 bean.setId((String) arr[0]); 384 bean.setUser((String) arr[1]); 385 bean.setGroup((String) arr[2]); 386 bean.setAppName((String) arr[3]); 387 break; 388 case GET_COORD_IDS_FOR_STATUS_TRANSIT: 389 bean = new CoordinatorJobBean(); 390 bean.setId((String) ret); 391 break; 392 case GET_COORD_JOB_CONF: 393 bean = new CoordinatorJobBean(); 394 bean.setConfBlob((StringBlob) ret); 395 break; 396 case GET_COORD_JOB_XML: 397 bean = new CoordinatorJobBean(); 398 bean.setJobXmlBlob((StringBlob) ret); 399 break; 400 401 default: 402 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for " 403 + namedQuery.name()); 404 } 405 return bean; 406 } 407 408 @Override 409 public CoordinatorJobBean get(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException { 410 CoordinatorJobBean bean = getIfExist(namedQuery, parameters); 411 if (bean == null) { 412 throw new JPAExecutorException(ErrorCode.E0605, getSelectQuery(namedQuery, 413 Services.get().get(JPAService.class).getEntityManager(), parameters).toString()); 414 } 415 return bean; 416 } 417 418 @Override 419 public List<CoordinatorJobBean> getList(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException { 420 JPAService jpaService = Services.get().get(JPAService.class); 421 EntityManager em = jpaService.getEntityManager(); 422 Query query = getSelectQuery(namedQuery, em, parameters); 423 List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em); 424 List<CoordinatorJobBean> beanList = new ArrayList<CoordinatorJobBean>(); 425 if (retList != null) { 426 for (Object ret : retList) { 427 beanList.add(constructBean(namedQuery, ret, parameters)); 428 } 429 } 430 return beanList; 431 } 432 433 @Override 434 public Object getSingleValue(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException { 435 throw new UnsupportedOperationException(); 436 } 437 438 @Override 439 public CoordinatorJobBean getIfExist(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException { 440 JPAService jpaService = Services.get().get(JPAService.class); 441 EntityManager em = jpaService.getEntityManager(); 442 Query query = getSelectQuery(namedQuery, em, parameters); 443 Object ret = jpaService.executeGet(namedQuery.name(), query, em); 444 if (ret == null) { 445 return null; 446 } 447 CoordinatorJobBean bean = constructBean(namedQuery, ret, parameters); 448 return bean; 449 } 450}