001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.executor.jpa; 020 021import java.sql.Timestamp; 022import java.util.ArrayList; 023import java.util.Date; 024import java.util.List; 025 026import javax.persistence.EntityManager; 027import javax.persistence.Query; 028 029import org.apache.oozie.CoordinatorActionBean; 030import org.apache.oozie.ErrorCode; 031import org.apache.oozie.StringBlob; 032import org.apache.oozie.client.CoordinatorAction; 033import org.apache.oozie.service.JPAService; 034import org.apache.oozie.service.Services; 035import org.apache.oozie.util.DateUtils; 036 037/** 038 * Query Executor that provides API to run query for Coordinator Action 039 */ 040public class CoordActionQueryExecutor extends 041 QueryExecutor<CoordinatorActionBean, CoordActionQueryExecutor.CoordActionQuery> { 042 043 public enum CoordActionQuery { 044 UPDATE_COORD_ACTION, 045 UPDATE_COORD_ACTION_STATUS_PENDING_TIME, 046 UPDATE_COORD_ACTION_FOR_INPUTCHECK, 047 UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK, 048 UPDATE_COORD_ACTION_DEPENDENCIES, 049 UPDATE_COORD_ACTION_FOR_START, 050 UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, 051 UPDATE_COORD_ACTION_RERUN, 052 GET_COORD_ACTION, 053 GET_COORD_ACTION_STATUS, 054 GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID, 055 GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME, 056 GET_COORD_ACTIONS_STATUS_UNIGNORED, 057 GET_COORD_ACTIONS_PENDING_COUNT, 058 GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE, 059 GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE, 060 GET_TERMINATED_ACTIONS_FOR_DATES, 061 GET_TERMINATED_ACTION_IDS_FOR_DATES, 062 GET_ACTIVE_ACTIONS_FOR_DATES, 063 GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN, 064 GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN, 065 GET_COORD_ACTION_FOR_SLA, 066 GET_COORD_ACTION_FOR_INPUTCHECK 067 }; 068 069 private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor(); 070 071 private CoordActionQueryExecutor() { 072 } 073 074 public static QueryExecutor<CoordinatorActionBean, CoordActionQueryExecutor.CoordActionQuery> getInstance() { 075 return CoordActionQueryExecutor.instance; 076 } 077 078 @Override 079 public Query getUpdateQuery(CoordActionQuery namedQuery, CoordinatorActionBean actionBean, EntityManager em) 080 throws JPAExecutorException { 081 082 Query query = em.createNamedQuery(namedQuery.name()); 083 switch (namedQuery) { 084 case UPDATE_COORD_ACTION: 085 query.setParameter("actionNumber", actionBean.getActionNumber()); 086 query.setParameter("actionXml", actionBean.getActionXmlBlob()); 087 query.setParameter("consoleUrl", actionBean.getConsoleUrl()); 088 query.setParameter("createdConf", actionBean.getCreatedConfBlob()); 089 query.setParameter("errorCode", actionBean.getErrorCode()); 090 query.setParameter("errorMessage", actionBean.getErrorMessage()); 091 query.setParameter("externalStatus", actionBean.getExternalStatus()); 092 query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob()); 093 query.setParameter("runConf", actionBean.getRunConfBlob()); 094 query.setParameter("timeOut", actionBean.getTimeOut()); 095 query.setParameter("trackerUri", actionBean.getTrackerUri()); 096 query.setParameter("type", actionBean.getType()); 097 query.setParameter("createdTime", actionBean.getCreatedTimestamp()); 098 query.setParameter("externalId", actionBean.getExternalId()); 099 query.setParameter("jobId", actionBean.getJobId()); 100 query.setParameter("lastModifiedTime", new Date()); 101 query.setParameter("nominalTime", actionBean.getNominalTimestamp()); 102 query.setParameter("slaXml", actionBean.getSlaXmlBlob()); 103 query.setParameter("status", actionBean.getStatus().toString()); 104 query.setParameter("id", actionBean.getId()); 105 break; 106 107 case UPDATE_COORD_ACTION_STATUS_PENDING_TIME: 108 query.setParameter("status", actionBean.getStatus().toString()); 109 query.setParameter("pending", actionBean.getPending()); 110 query.setParameter("lastModifiedTime", new Date()); 111 query.setParameter("id", actionBean.getId()); 112 break; 113 114 case UPDATE_COORD_ACTION_FOR_INPUTCHECK: 115 query.setParameter("status", actionBean.getStatus().toString()); 116 query.setParameter("lastModifiedTime", new Date()); 117 query.setParameter("actionXml", actionBean.getActionXmlBlob()); 118 query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob()); 119 query.setParameter("id", actionBean.getId()); 120 break; 121 122 case UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK: 123 query.setParameter("status", actionBean.getStatus().toString()); 124 query.setParameter("lastModifiedTime", new Date()); 125 query.setParameter("actionXml", actionBean.getActionXmlBlob()); 126 query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob()); 127 query.setParameter("id", actionBean.getId()); 128 break; 129 130 case UPDATE_COORD_ACTION_DEPENDENCIES: 131 query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob()); 132 query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob()); 133 query.setParameter("id", actionBean.getId()); 134 break; 135 136 case UPDATE_COORD_ACTION_FOR_START: 137 query.setParameter("status", actionBean.getStatus().toString()); 138 query.setParameter("lastModifiedTime", new Date()); 139 query.setParameter("runConf", actionBean.getRunConfBlob()); 140 query.setParameter("externalId", actionBean.getExternalId()); 141 query.setParameter("pending", actionBean.getPending()); 142 query.setParameter("errorCode", actionBean.getErrorCode()); 143 query.setParameter("errorMessage", actionBean.getErrorMessage()); 144 query.setParameter("id", actionBean.getId()); 145 break; 146 147 case UPDATE_COORD_ACTION_FOR_MODIFIED_DATE: 148 query.setParameter("lastModifiedTime", actionBean.getLastModifiedTimestamp()); 149 query.setParameter("id", actionBean.getId()); 150 break; 151 152 case UPDATE_COORD_ACTION_RERUN: 153 query.setParameter("actionXml", actionBean.getActionXmlBlob()); 154 query.setParameter("status", actionBean.getStatusStr()); 155 query.setParameter("externalId", actionBean.getExternalId()); 156 query.setParameter("externalStatus", actionBean.getExternalStatus()); 157 query.setParameter("rerunTime", actionBean.getRerunTimestamp()); 158 query.setParameter("lastModifiedTime", actionBean.getLastModifiedTimestamp()); 159 query.setParameter("createdTime", actionBean.getCreatedTimestamp()); 160 query.setParameter("createdConf", actionBean.getCreatedConfBlob()); 161 query.setParameter("runConf", actionBean.getRunConfBlob()); 162 query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob()); 163 query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob()); 164 query.setParameter("errorCode", actionBean.getErrorCode()); 165 query.setParameter("errorMessage", actionBean.getErrorMessage()); 166 query.setParameter("id", actionBean.getId()); 167 break; 168 169 default: 170 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 171 + namedQuery.name()); 172 } 173 return query; 174 } 175 176 @Override 177 public Query getSelectQuery(CoordActionQuery namedQuery, EntityManager em, Object... parameters) 178 throws JPAExecutorException { 179 Query query = em.createNamedQuery(namedQuery.name()); 180 CoordActionQuery caQuery = (CoordActionQuery) namedQuery; 181 switch (caQuery) { 182 case GET_COORD_ACTION: 183 case GET_COORD_ACTION_STATUS: 184 case GET_COORD_ACTION_FOR_SLA: 185 case GET_COORD_ACTION_FOR_INPUTCHECK: 186 query.setParameter("id", parameters[0]); 187 break; 188 case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME: 189 query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime())); 190 break; 191 case GET_COORD_ACTIONS_STATUS_UNIGNORED: 192 query.setParameter("jobId", parameters[0]); 193 break; 194 case GET_COORD_ACTIONS_PENDING_COUNT: 195 query.setParameter("jobId", parameters[0]); 196 break; 197 case GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE: 198 query.setParameter("ids", parameters[0]); 199 break; 200 case GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE: 201 query.setParameter("jobId", parameters[0]); 202 break; 203 case GET_TERMINATED_ACTIONS_FOR_DATES: 204 case GET_TERMINATED_ACTION_IDS_FOR_DATES: 205 case GET_ACTIVE_ACTIONS_FOR_DATES: 206 query.setParameter("jobId", parameters[0]); 207 query.setParameter("startTime", new Timestamp(((Date) parameters[1]).getTime())); 208 query.setParameter("endTime", new Timestamp(((Date) parameters[2]).getTime())); 209 break; 210 case GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN: 211 query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime())); 212 break; 213 case GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN: 214 query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime())); 215 query.setParameter("currentTime", new Timestamp(new Date().getTime())); 216 break; 217 218 default: 219 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 220 + caQuery.name()); 221 } 222 return query; 223 } 224 225 @Override 226 public int executeUpdate(CoordActionQuery namedQuery, CoordinatorActionBean jobBean) throws JPAExecutorException { 227 JPAService jpaService = Services.get().get(JPAService.class); 228 EntityManager em = jpaService.getEntityManager(); 229 Query query = getUpdateQuery(namedQuery, jobBean, em); 230 int ret = jpaService.executeUpdate(namedQuery.name(), query, em); 231 return ret; 232 } 233 234 @Override 235 public CoordinatorActionBean get(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException { 236 CoordinatorActionBean bean = getIfExist(namedQuery, parameters); 237 if (bean == null) { 238 throw new JPAExecutorException(ErrorCode.E0605, getSelectQuery(namedQuery, 239 Services.get().get(JPAService.class).getEntityManager(), parameters).toString()); 240 } 241 return bean; 242 } 243 244 @Override 245 public CoordinatorActionBean getIfExist(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException { 246 JPAService jpaService = Services.get().get(JPAService.class); 247 EntityManager em = jpaService.getEntityManager(); 248 Query query = getSelectQuery(namedQuery, em, parameters); 249 Object ret = jpaService.executeGet(namedQuery.name(), query, em); 250 if (ret == null) { 251 return null; 252 } 253 CoordinatorActionBean bean = constructBean(namedQuery, ret); 254 return bean; 255 } 256 257 @Override 258 public List<CoordinatorActionBean> getList(CoordActionQuery namedQuery, Object... parameters) 259 throws JPAExecutorException { 260 JPAService jpaService = Services.get().get(JPAService.class); 261 EntityManager em = jpaService.getEntityManager(); 262 Query query = getSelectQuery(namedQuery, em, parameters); 263 List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em); 264 List<CoordinatorActionBean> beanList = new ArrayList<CoordinatorActionBean>(); 265 if (retList != null) { 266 for (Object ret : retList) { 267 beanList.add(constructBean(namedQuery, ret)); 268 } 269 } 270 return beanList; 271 } 272 273 private CoordinatorActionBean constructBean(CoordActionQuery namedQuery, Object ret) throws JPAExecutorException { 274 CoordinatorActionBean bean; 275 Object[] arr; 276 switch (namedQuery) { 277 case GET_COORD_ACTION: 278 bean = (CoordinatorActionBean) ret; 279 break; 280 case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME: 281 bean = new CoordinatorActionBean(); 282 bean.setJobId((String) ret); 283 break; 284 case GET_COORD_ACTION_STATUS: 285 bean = new CoordinatorActionBean(); 286 bean.setStatusStr((String)ret); 287 break; 288 case GET_COORD_ACTIONS_STATUS_UNIGNORED: 289 arr = (Object[]) ret; 290 bean = new CoordinatorActionBean(); 291 bean.setStatusStr((String)arr[0]); 292 bean.setPending((Integer)arr[1]); 293 break; 294 case GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE: 295 case GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE: 296 arr = (Object[]) ret; 297 bean = new CoordinatorActionBean(); 298 bean.setId((String)arr[0]); 299 bean.setNominalTime((Timestamp)arr[1]); 300 bean.setCreatedTime((Timestamp)arr[2]); 301 bean.setActionXmlBlob((StringBlob)arr[3]); 302 break; 303 case GET_TERMINATED_ACTIONS_FOR_DATES: 304 bean = (CoordinatorActionBean) ret; 305 break; 306 case GET_TERMINATED_ACTION_IDS_FOR_DATES: 307 bean = new CoordinatorActionBean(); 308 bean.setId((String) ret); 309 break; 310 case GET_ACTIVE_ACTIONS_FOR_DATES: 311 arr = (Object[]) ret; 312 bean = new CoordinatorActionBean(); 313 bean.setId((String)arr[0]); 314 bean.setJobId((String)arr[1]); 315 bean.setStatusStr((String) arr[2]); 316 bean.setExternalId((String) arr[3]); 317 bean.setPending((Integer) arr[4]); 318 bean.setNominalTime((Timestamp) arr[5]); 319 bean.setCreatedTime((Timestamp) arr[6]); 320 break; 321 case GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN: 322 arr = (Object[]) ret; 323 bean = new CoordinatorActionBean(); 324 bean.setId((String)arr[0]); 325 bean.setJobId((String)arr[1]); 326 bean.setStatusStr((String) arr[2]); 327 bean.setExternalId((String) arr[3]); 328 bean.setPending((Integer) arr[4]); 329 break; 330 case GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN: 331 arr = (Object[]) ret; 332 bean = new CoordinatorActionBean(); 333 bean.setId((String)arr[0]); 334 bean.setJobId((String)arr[1]); 335 bean.setStatusStr((String) arr[2]); 336 bean.setExternalId((String) arr[3]); 337 bean.setPushMissingDependenciesBlob((StringBlob) arr[4]); 338 break; 339 case GET_COORD_ACTION_FOR_SLA: 340 arr = (Object[]) ret; 341 bean = new CoordinatorActionBean(); 342 bean.setId((String) arr[0]); 343 bean.setJobId((String) arr[1]); 344 bean.setStatusStr((String) arr[2]); 345 bean.setExternalId((String) arr[3]); 346 bean.setLastModifiedTime((Timestamp) arr[4]); 347 break; 348 case GET_COORD_ACTION_FOR_INPUTCHECK: 349 arr = (Object[]) ret; 350 bean = new CoordinatorActionBean(); 351 bean.setId((String) arr[0]); 352 bean.setActionNumber((Integer) arr[1]); 353 bean.setJobId((String) arr[2]); 354 bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[3])); 355 bean.setRunConfBlob((StringBlob) arr[4]); 356 bean.setNominalTime(DateUtils.toDate((Timestamp) arr[5])); 357 bean.setCreatedTime(DateUtils.toDate((Timestamp) arr[6])); 358 bean.setActionXmlBlob((StringBlob) arr[7]); 359 bean.setMissingDependenciesBlob((StringBlob) arr[8]); 360 bean.setPushMissingDependenciesBlob((StringBlob) arr[9]); 361 bean.setTimeOut((Integer) arr[10]); 362 bean.setExternalId((String) arr[11]); 363 break; 364 365 default: 366 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for " 367 + namedQuery.name()); 368 } 369 return bean; 370 } 371 372 @Override 373 public Object getSingleValue(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException { 374 JPAService jpaService = Services.get().get(JPAService.class); 375 EntityManager em = jpaService.getEntityManager(); 376 Query query = getSelectQuery(namedQuery, em, parameters); 377 Object ret = jpaService.executeGet(namedQuery.name(), query, em); 378 if (ret == null) { 379 throw new JPAExecutorException(ErrorCode.E0604, query.toString()); 380 } 381 return ret; 382 } 383}