001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.executor.jpa; 020 021import java.sql.Timestamp; 022import java.util.ArrayList; 023import java.util.Date; 024import java.util.List; 025 026import javax.persistence.EntityManager; 027import javax.persistence.Query; 028 029import org.apache.oozie.CoordinatorActionBean; 030import org.apache.oozie.ErrorCode; 031import org.apache.oozie.StringBlob; 032import org.apache.oozie.service.JPAService; 033import org.apache.oozie.service.Services; 034 035/** 036 * Query Executor that provides API to run query for Coordinator Action 037 */ 038public class CoordActionQueryExecutor extends 039 QueryExecutor<CoordinatorActionBean, CoordActionQueryExecutor.CoordActionQuery> { 040 041 public enum CoordActionQuery { 042 UPDATE_COORD_ACTION, 043 UPDATE_COORD_ACTION_STATUS_PENDING_TIME, 044 UPDATE_COORD_ACTION_FOR_INPUTCHECK, 045 UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK, 046 UPDATE_COORD_ACTION_DEPENDENCIES, 047 UPDATE_COORD_ACTION_FOR_START, 048 UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, 049 UPDATE_COORD_ACTION_RERUN, 050 GET_COORD_ACTION, 051 GET_COORD_ACTION_STATUS, 052 GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID, 053 GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME, 054 GET_COORD_ACTIONS_STATUS_UNIGNORED, 055 GET_COORD_ACTIONS_PENDING_COUNT, 056 GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE, 057 GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE, 058 GET_TERMINATED_ACTIONS_FOR_DATES, 059 GET_TERMINATED_ACTION_IDS_FOR_DATES, 060 GET_ACTIVE_ACTIONS_FOR_DATES, 061 GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN, 062 GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN, 063 GET_COORD_ACTION_FOR_SLA 064 }; 065 066 private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor(); 067 068 private CoordActionQueryExecutor() { 069 } 070 071 public static QueryExecutor<CoordinatorActionBean, CoordActionQueryExecutor.CoordActionQuery> getInstance() { 072 return CoordActionQueryExecutor.instance; 073 } 074 075 @Override 076 public Query getUpdateQuery(CoordActionQuery namedQuery, CoordinatorActionBean actionBean, EntityManager em) 077 throws JPAExecutorException { 078 079 Query query = em.createNamedQuery(namedQuery.name()); 080 switch (namedQuery) { 081 case UPDATE_COORD_ACTION: 082 query.setParameter("actionNumber", actionBean.getActionNumber()); 083 query.setParameter("actionXml", actionBean.getActionXmlBlob()); 084 query.setParameter("consoleUrl", actionBean.getConsoleUrl()); 085 query.setParameter("createdConf", actionBean.getCreatedConfBlob()); 086 query.setParameter("errorCode", actionBean.getErrorCode()); 087 query.setParameter("errorMessage", actionBean.getErrorMessage()); 088 query.setParameter("externalStatus", actionBean.getExternalStatus()); 089 query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob()); 090 query.setParameter("runConf", actionBean.getRunConfBlob()); 091 query.setParameter("timeOut", actionBean.getTimeOut()); 092 query.setParameter("trackerUri", actionBean.getTrackerUri()); 093 query.setParameter("type", actionBean.getType()); 094 query.setParameter("createdTime", actionBean.getCreatedTimestamp()); 095 query.setParameter("externalId", actionBean.getExternalId()); 096 query.setParameter("jobId", actionBean.getJobId()); 097 query.setParameter("lastModifiedTime", new Date()); 098 query.setParameter("nominalTime", actionBean.getNominalTimestamp()); 099 query.setParameter("slaXml", actionBean.getSlaXmlBlob()); 100 query.setParameter("status", actionBean.getStatus().toString()); 101 query.setParameter("id", actionBean.getId()); 102 break; 103 104 case UPDATE_COORD_ACTION_STATUS_PENDING_TIME: 105 query.setParameter("status", actionBean.getStatus().toString()); 106 query.setParameter("pending", actionBean.getPending()); 107 query.setParameter("lastModifiedTime", new Date()); 108 query.setParameter("id", actionBean.getId()); 109 break; 110 111 case UPDATE_COORD_ACTION_FOR_INPUTCHECK: 112 query.setParameter("status", actionBean.getStatus().toString()); 113 query.setParameter("lastModifiedTime", new Date()); 114 query.setParameter("actionXml", actionBean.getActionXmlBlob()); 115 query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob()); 116 query.setParameter("id", actionBean.getId()); 117 break; 118 119 case UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK: 120 query.setParameter("status", actionBean.getStatus().toString()); 121 query.setParameter("lastModifiedTime", new Date()); 122 query.setParameter("actionXml", actionBean.getActionXmlBlob()); 123 query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob()); 124 query.setParameter("id", actionBean.getId()); 125 break; 126 127 case UPDATE_COORD_ACTION_DEPENDENCIES: 128 query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob()); 129 query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob()); 130 query.setParameter("id", actionBean.getId()); 131 break; 132 133 case UPDATE_COORD_ACTION_FOR_START: 134 query.setParameter("status", actionBean.getStatus().toString()); 135 query.setParameter("lastModifiedTime", new Date()); 136 query.setParameter("runConf", actionBean.getRunConfBlob()); 137 query.setParameter("externalId", actionBean.getExternalId()); 138 query.setParameter("pending", actionBean.getPending()); 139 query.setParameter("errorCode", actionBean.getErrorCode()); 140 query.setParameter("errorMessage", actionBean.getErrorMessage()); 141 query.setParameter("id", actionBean.getId()); 142 break; 143 144 case UPDATE_COORD_ACTION_FOR_MODIFIED_DATE: 145 query.setParameter("lastModifiedTime", actionBean.getLastModifiedTimestamp()); 146 query.setParameter("id", actionBean.getId()); 147 break; 148 149 case UPDATE_COORD_ACTION_RERUN: 150 query.setParameter("actionXml", actionBean.getActionXmlBlob()); 151 query.setParameter("status", actionBean.getStatusStr()); 152 query.setParameter("externalId", actionBean.getExternalId()); 153 query.setParameter("externalStatus", actionBean.getExternalStatus()); 154 query.setParameter("rerunTime", actionBean.getRerunTimestamp()); 155 query.setParameter("lastModifiedTime", actionBean.getLastModifiedTimestamp()); 156 query.setParameter("createdTime", actionBean.getCreatedTimestamp()); 157 query.setParameter("createdConf", actionBean.getCreatedConfBlob()); 158 query.setParameter("runConf", actionBean.getRunConfBlob()); 159 query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob()); 160 query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob()); 161 query.setParameter("errorCode", actionBean.getErrorCode()); 162 query.setParameter("errorMessage", actionBean.getErrorMessage()); 163 query.setParameter("id", actionBean.getId()); 164 break; 165 166 default: 167 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 168 + namedQuery.name()); 169 } 170 return query; 171 } 172 173 @Override 174 public Query getSelectQuery(CoordActionQuery namedQuery, EntityManager em, Object... parameters) 175 throws JPAExecutorException { 176 Query query = em.createNamedQuery(namedQuery.name()); 177 CoordActionQuery caQuery = (CoordActionQuery) namedQuery; 178 switch (caQuery) { 179 case GET_COORD_ACTION: 180 case GET_COORD_ACTION_STATUS: 181 case GET_COORD_ACTION_FOR_SLA: 182 query.setParameter("id", parameters[0]); 183 break; 184 case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME: 185 query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime())); 186 break; 187 case GET_COORD_ACTIONS_STATUS_UNIGNORED: 188 query.setParameter("jobId", parameters[0]); 189 break; 190 case GET_COORD_ACTIONS_PENDING_COUNT: 191 query.setParameter("jobId", parameters[0]); 192 break; 193 case GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE: 194 query.setParameter("ids", parameters[0]); 195 break; 196 case GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE: 197 query.setParameter("jobId", parameters[0]); 198 break; 199 case GET_TERMINATED_ACTIONS_FOR_DATES: 200 case GET_TERMINATED_ACTION_IDS_FOR_DATES: 201 case GET_ACTIVE_ACTIONS_FOR_DATES: 202 query.setParameter("jobId", parameters[0]); 203 query.setParameter("startTime", new Timestamp(((Date) parameters[1]).getTime())); 204 query.setParameter("endTime", new Timestamp(((Date) parameters[2]).getTime())); 205 break; 206 case GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN: 207 query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime())); 208 break; 209 case GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN: 210 query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime())); 211 query.setParameter("currentTime", new Timestamp(new Date().getTime())); 212 break; 213 214 default: 215 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 216 + caQuery.name()); 217 } 218 return query; 219 } 220 221 @Override 222 public int executeUpdate(CoordActionQuery namedQuery, CoordinatorActionBean jobBean) throws JPAExecutorException { 223 JPAService jpaService = Services.get().get(JPAService.class); 224 EntityManager em = jpaService.getEntityManager(); 225 Query query = getUpdateQuery(namedQuery, jobBean, em); 226 int ret = jpaService.executeUpdate(namedQuery.name(), query, em); 227 return ret; 228 } 229 230 @Override 231 public CoordinatorActionBean get(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException { 232 CoordinatorActionBean bean = getIfExist(namedQuery, parameters); 233 if (bean == null) { 234 throw new JPAExecutorException(ErrorCode.E0605, getSelectQuery(namedQuery, 235 Services.get().get(JPAService.class).getEntityManager(), parameters).toString()); 236 } 237 return bean; 238 } 239 240 @Override 241 public CoordinatorActionBean getIfExist(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException { 242 JPAService jpaService = Services.get().get(JPAService.class); 243 EntityManager em = jpaService.getEntityManager(); 244 Query query = getSelectQuery(namedQuery, em, parameters); 245 Object ret = jpaService.executeGet(namedQuery.name(), query, em); 246 if (ret == null) { 247 return null; 248 } 249 CoordinatorActionBean bean = constructBean(namedQuery, ret); 250 return bean; 251 } 252 253 @Override 254 public List<CoordinatorActionBean> getList(CoordActionQuery namedQuery, Object... parameters) 255 throws JPAExecutorException { 256 JPAService jpaService = Services.get().get(JPAService.class); 257 EntityManager em = jpaService.getEntityManager(); 258 Query query = getSelectQuery(namedQuery, em, parameters); 259 List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em); 260 List<CoordinatorActionBean> beanList = new ArrayList<CoordinatorActionBean>(); 261 if (retList != null) { 262 for (Object ret : retList) { 263 beanList.add(constructBean(namedQuery, ret)); 264 } 265 } 266 return beanList; 267 } 268 269 private CoordinatorActionBean constructBean(CoordActionQuery namedQuery, Object ret) throws JPAExecutorException { 270 CoordinatorActionBean bean; 271 Object[] arr; 272 switch (namedQuery) { 273 case GET_COORD_ACTION: 274 bean = (CoordinatorActionBean) ret; 275 break; 276 case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME: 277 bean = new CoordinatorActionBean(); 278 bean.setJobId((String) ret); 279 break; 280 case GET_COORD_ACTION_STATUS: 281 bean = new CoordinatorActionBean(); 282 bean.setStatusStr((String)ret); 283 break; 284 case GET_COORD_ACTIONS_STATUS_UNIGNORED: 285 arr = (Object[]) ret; 286 bean = new CoordinatorActionBean(); 287 bean.setStatusStr((String)arr[0]); 288 bean.setPending((Integer)arr[1]); 289 break; 290 case GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE: 291 case GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE: 292 arr = (Object[]) ret; 293 bean = new CoordinatorActionBean(); 294 bean.setId((String)arr[0]); 295 bean.setNominalTime((Timestamp)arr[1]); 296 bean.setCreatedTime((Timestamp)arr[2]); 297 bean.setActionXmlBlob((StringBlob)arr[3]); 298 break; 299 case GET_TERMINATED_ACTIONS_FOR_DATES: 300 bean = (CoordinatorActionBean) ret; 301 break; 302 case GET_TERMINATED_ACTION_IDS_FOR_DATES: 303 bean = new CoordinatorActionBean(); 304 bean.setId((String) ret); 305 break; 306 case GET_ACTIVE_ACTIONS_FOR_DATES: 307 arr = (Object[]) ret; 308 bean = new CoordinatorActionBean(); 309 bean.setId((String)arr[0]); 310 bean.setJobId((String)arr[1]); 311 bean.setStatusStr((String) arr[2]); 312 bean.setExternalId((String) arr[3]); 313 bean.setPending((Integer) arr[4]); 314 bean.setNominalTime((Timestamp) arr[5]); 315 bean.setCreatedTime((Timestamp) arr[6]); 316 break; 317 case GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN: 318 arr = (Object[]) ret; 319 bean = new CoordinatorActionBean(); 320 bean.setId((String)arr[0]); 321 bean.setJobId((String)arr[1]); 322 bean.setStatusStr((String) arr[2]); 323 bean.setExternalId((String) arr[3]); 324 bean.setPending((Integer) arr[4]); 325 break; 326 case GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN: 327 arr = (Object[]) ret; 328 bean = new CoordinatorActionBean(); 329 bean.setId((String)arr[0]); 330 bean.setJobId((String)arr[1]); 331 bean.setStatusStr((String) arr[2]); 332 bean.setExternalId((String) arr[3]); 333 bean.setPushMissingDependenciesBlob((StringBlob) arr[4]); 334 break; 335 case GET_COORD_ACTION_FOR_SLA: 336 arr = (Object[]) ret; 337 bean = new CoordinatorActionBean(); 338 bean.setId((String) arr[0]); 339 bean.setJobId((String) arr[1]); 340 bean.setStatusStr((String) arr[2]); 341 bean.setExternalId((String) arr[3]); 342 bean.setLastModifiedTime((Timestamp) arr[4]); 343 break; 344 345 default: 346 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for " 347 + namedQuery.name()); 348 } 349 return bean; 350 } 351 352 @Override 353 public Object getSingleValue(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException { 354 JPAService jpaService = Services.get().get(JPAService.class); 355 EntityManager em = jpaService.getEntityManager(); 356 Query query = getSelectQuery(namedQuery, em, parameters); 357 Object ret = jpaService.executeGet(namedQuery.name(), query, em); 358 if (ret == null) { 359 throw new JPAExecutorException(ErrorCode.E0604, query.toString()); 360 } 361 return ret; 362 } 363}