001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.executor.jpa; 020 021import java.sql.Timestamp; 022import java.util.ArrayList; 023import java.util.List; 024 025import javax.persistence.EntityManager; 026import javax.persistence.Query; 027 028import org.apache.oozie.ErrorCode; 029import org.apache.oozie.StringBlob; 030import org.apache.oozie.WorkflowActionBean; 031import org.apache.oozie.service.JPAService; 032import org.apache.oozie.service.Services; 033import org.apache.oozie.util.DateUtils; 034 035/** 036 * Query Executor that provides API to run query for Workflow Action 037 */ 038public class WorkflowActionQueryExecutor extends 039 QueryExecutor<WorkflowActionBean, WorkflowActionQueryExecutor.WorkflowActionQuery> { 040 041 public enum WorkflowActionQuery { 042 UPDATE_ACTION, 043 UPDATE_ACTION_FOR_LAST_CHECKED_TIME, 044 UPDATE_ACTION_START, 045 UPDATE_ACTION_CHECK, 046 UPDATE_ACTION_END, 047 UPDATE_ACTION_PENDING, 048 UPDATE_ACTION_STATUS_PENDING, 049 UPDATE_ACTION_PENDING_TRANS, 050 UPDATE_ACTION_PENDING_TRANS_ERROR, 051 GET_ACTION, 052 GET_ACTION_ID_TYPE_LASTCHECK, 053 GET_ACTION_FAIL, 054 GET_ACTION_SIGNAL, 055 GET_ACTION_CHECK, 056 GET_ACTION_END, 057 GET_ACTION_COMPLETED, 058 GET_RUNNING_ACTIONS, 059 GET_PENDING_ACTIONS, 060 GET_ACTIONS_FOR_WORKFLOW_RERUN, 061 GET_ACTION_FOR_SLA 062 }; 063 064 private static WorkflowActionQueryExecutor instance = new WorkflowActionQueryExecutor(); 065 066 private WorkflowActionQueryExecutor() { 067 } 068 069 public static QueryExecutor<WorkflowActionBean, WorkflowActionQuery> getInstance() { 070 return WorkflowActionQueryExecutor.instance; 071 } 072 073 @Override 074 public Query getUpdateQuery(WorkflowActionQuery namedQuery, WorkflowActionBean actionBean, EntityManager em) 075 throws JPAExecutorException { 076 Query query = em.createNamedQuery(namedQuery.name()); 077 switch (namedQuery) { 078 case UPDATE_ACTION: 079 query.setParameter("conf", actionBean.getConfBlob()); 080 query.setParameter("consoleUrl", actionBean.getConsoleUrl()); 081 query.setParameter("data", actionBean.getDataBlob()); 082 query.setParameter("stats", actionBean.getStatsBlob()); 083 query.setParameter("externalChildIDs", actionBean.getExternalChildIDsBlob()); 084 query.setParameter("errorCode", actionBean.getErrorCode()); 085 query.setParameter("errorMessage", actionBean.getErrorMessage()); 086 query.setParameter("externalId", actionBean.getExternalId()); 087 query.setParameter("externalStatus", actionBean.getExternalStatus()); 088 query.setParameter("name", actionBean.getName()); 089 query.setParameter("cred", actionBean.getCred()); 090 query.setParameter("retries", actionBean.getRetries()); 091 query.setParameter("trackerUri", actionBean.getTrackerUri()); 092 query.setParameter("transition", actionBean.getTransition()); 093 query.setParameter("type", actionBean.getType()); 094 query.setParameter("endTime", actionBean.getEndTimestamp()); 095 query.setParameter("executionPath", actionBean.getExecutionPath()); 096 query.setParameter("lastCheckTime", actionBean.getLastCheckTimestamp()); 097 query.setParameter("logToken", actionBean.getLogToken()); 098 query.setParameter("pending", actionBean.getPending()); 099 query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp()); 100 query.setParameter("signalValue", actionBean.getSignalValue()); 101 query.setParameter("slaXml", actionBean.getSlaXmlBlob()); 102 query.setParameter("startTime", actionBean.getStartTimestamp()); 103 query.setParameter("status", actionBean.getStatusStr()); 104 query.setParameter("wfId", actionBean.getWfId()); 105 query.setParameter("id", actionBean.getId()); 106 break; 107 case UPDATE_ACTION_FOR_LAST_CHECKED_TIME: 108 query.setParameter("lastCheckTime", actionBean.getLastCheckTimestamp()); 109 query.setParameter("id", actionBean.getId()); 110 break; 111 case UPDATE_ACTION_PENDING: 112 query.setParameter("pending", actionBean.getPending()); 113 query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp()); 114 query.setParameter("executionPath", actionBean.getExecutionPath()); 115 query.setParameter("id", actionBean.getId()); 116 break; 117 case UPDATE_ACTION_STATUS_PENDING: 118 query.setParameter("status", actionBean.getStatus().toString()); 119 query.setParameter("pending", actionBean.getPending()); 120 query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp()); 121 query.setParameter("id", actionBean.getId()); 122 break; 123 case UPDATE_ACTION_PENDING_TRANS: 124 query.setParameter("transition", actionBean.getTransition()); 125 query.setParameter("pending", actionBean.getPending()); 126 query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp()); 127 query.setParameter("id", actionBean.getId()); 128 break; 129 case UPDATE_ACTION_PENDING_TRANS_ERROR: 130 query.setParameter("transition", actionBean.getTransition()); 131 query.setParameter("pending", actionBean.getPending()); 132 query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp()); 133 query.setParameter("errorCode", actionBean.getErrorCode()); 134 query.setParameter("errorMessage", actionBean.getErrorMessage()); 135 query.setParameter("status", actionBean.getStatusStr()); 136 query.setParameter("id", actionBean.getId()); 137 break; 138 case UPDATE_ACTION_START: 139 query.setParameter("startTime", actionBean.getStartTimestamp()); 140 query.setParameter("externalChildIDs", actionBean.getExternalChildIDsBlob()); 141 query.setParameter("conf", actionBean.getConfBlob()); 142 query.setParameter("errorCode", actionBean.getErrorCode()); 143 query.setParameter("errorMessage", actionBean.getErrorMessage()); 144 query.setParameter("externalId", actionBean.getExternalId()); 145 query.setParameter("trackerUri", actionBean.getTrackerUri()); 146 query.setParameter("consoleUrl", actionBean.getConsoleUrl()); 147 query.setParameter("lastCheckTime", actionBean.getLastCheckTimestamp()); 148 query.setParameter("status", actionBean.getStatus().toString()); 149 query.setParameter("externalStatus", actionBean.getExternalStatus()); 150 query.setParameter("data", actionBean.getDataBlob()); 151 query.setParameter("retries", actionBean.getRetries()); 152 query.setParameter("pending", actionBean.getPending()); 153 query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp()); 154 query.setParameter("userRetryCount", actionBean.getUserRetryCount()); 155 query.setParameter("id", actionBean.getId()); 156 break; 157 case UPDATE_ACTION_CHECK: 158 query.setParameter("externalChildIDs", actionBean.getExternalChildIDsBlob()); 159 query.setParameter("externalStatus", actionBean.getExternalStatus()); 160 query.setParameter("status", actionBean.getStatus().toString()); 161 query.setParameter("data", actionBean.getDataBlob()); 162 query.setParameter("pending", actionBean.getPending()); 163 query.setParameter("errorCode", actionBean.getErrorCode()); 164 query.setParameter("errorMessage", actionBean.getErrorMessage()); 165 query.setParameter("lastCheckTime", actionBean.getLastCheckTimestamp()); 166 query.setParameter("retries", actionBean.getRetries()); 167 query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp()); 168 query.setParameter("startTime", actionBean.getStartTimestamp()); 169 query.setParameter("stats", actionBean.getStatsBlob()); 170 query.setParameter("userRetryCount", actionBean.getUserRetryCount()); 171 query.setParameter("id", actionBean.getId()); 172 break; 173 case UPDATE_ACTION_END: 174 query.setParameter("errorCode", actionBean.getErrorCode()); 175 query.setParameter("errorMessage", actionBean.getErrorMessage()); 176 query.setParameter("retries", actionBean.getRetries()); 177 query.setParameter("status", actionBean.getStatus().toString()); 178 query.setParameter("endTime", actionBean.getEndTimestamp()); 179 query.setParameter("pending", actionBean.getPending()); 180 query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp()); 181 query.setParameter("signalValue", actionBean.getSignalValue()); 182 query.setParameter("userRetryCount", actionBean.getUserRetryCount()); 183 query.setParameter("externalStatus", actionBean.getExternalStatus()); 184 query.setParameter("stats", actionBean.getStatsBlob()); 185 query.setParameter("id", actionBean.getId()); 186 break; 187 default: 188 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 189 + namedQuery.name()); 190 } 191 return query; 192 } 193 194 @Override 195 public Query getSelectQuery(WorkflowActionQuery namedQuery, EntityManager em, Object... parameters) 196 throws JPAExecutorException { 197 Query query = em.createNamedQuery(namedQuery.name()); 198 switch (namedQuery) { 199 case GET_ACTION: 200 case GET_ACTION_ID_TYPE_LASTCHECK: 201 case GET_ACTION_FAIL: 202 case GET_ACTION_SIGNAL: 203 case GET_ACTION_CHECK: 204 case GET_ACTION_END: 205 case GET_ACTION_COMPLETED: 206 case GET_ACTION_FOR_SLA: 207 query.setParameter("id", parameters[0]); 208 break; 209 case GET_RUNNING_ACTIONS: 210 Timestamp ts = new Timestamp(System.currentTimeMillis() - (Integer) parameters[0] * 1000); 211 query.setParameter("lastCheckTime", ts); 212 break; 213 case GET_PENDING_ACTIONS: 214 Long minimumPendingAgeSecs = (Long) parameters[0]; 215 Timestamp pts = new Timestamp(System.currentTimeMillis() - minimumPendingAgeSecs * 1000); 216 Timestamp createdTimeInterval = new Timestamp((Long) parameters[1]); 217 query.setParameter("pendingAge", pts); 218 query.setParameter("createdTime", createdTimeInterval); 219 break; 220 case GET_ACTIONS_FOR_WORKFLOW_RERUN: 221 query.setParameter("wfId", parameters[0]); 222 break; 223 default: 224 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 225 + namedQuery.name()); 226 } 227 return query; 228 } 229 230 @Override 231 public int executeUpdate(WorkflowActionQuery namedQuery, WorkflowActionBean actionBean) throws JPAExecutorException { 232 JPAService jpaService = Services.get().get(JPAService.class); 233 EntityManager em = jpaService.getEntityManager(); 234 Query query = getUpdateQuery(namedQuery, actionBean, em); 235 int ret = jpaService.executeUpdate(namedQuery.name(), query, em); 236 return ret; 237 } 238 239 private WorkflowActionBean constructBean(WorkflowActionQuery namedQuery, Object ret) throws JPAExecutorException { 240 WorkflowActionBean bean; 241 Object[] arr; 242 switch (namedQuery) { 243 case GET_ACTION: 244 bean = (WorkflowActionBean) ret; 245 break; 246 case GET_ACTION_ID_TYPE_LASTCHECK: 247 bean = new WorkflowActionBean(); 248 arr = (Object[]) ret; 249 bean.setId((String) arr[0]); 250 bean.setType((String) arr[1]); 251 bean.setLastCheckTime(DateUtils.toDate((Timestamp) arr[2])); 252 break; 253 case GET_ACTION_FAIL: 254 bean = new WorkflowActionBean(); 255 arr = (Object[]) ret; 256 bean.setId((String) arr[0]); 257 bean.setJobId((String) arr[1]); 258 bean.setName((String) arr[2]); 259 bean.setStatusStr((String) arr[3]); 260 bean.setPending((Integer) arr[4]); 261 bean.setType((String) arr[5]); 262 bean.setLogToken((String) arr[6]); 263 bean.setTransition((String) arr[7]); 264 bean.setErrorInfo((String) arr[8], (String) arr[9]); 265 break; 266 case GET_ACTION_SIGNAL: 267 bean = new WorkflowActionBean(); 268 arr = (Object[]) ret; 269 bean.setId((String) arr[0]); 270 bean.setJobId((String) arr[1]); 271 bean.setName((String) arr[2]); 272 bean.setStatusStr((String) arr[3]); 273 bean.setPending((Integer) arr[4]); 274 bean.setPendingAge(DateUtils.toDate((Timestamp) arr[5])); 275 bean.setType((String) arr[6]); 276 bean.setLogToken((String) arr[7]); 277 bean.setTransition((String) arr[8]); 278 bean.setErrorInfo((String) arr[9], (String) arr[10]); 279 bean.setExecutionPath((String) arr[11]); 280 bean.setSignalValue((String) arr[12]); 281 bean.setSlaXmlBlob((StringBlob) arr[13]); 282 bean.setExternalId((String) arr[14]); 283 break; 284 case GET_ACTION_CHECK: 285 bean = new WorkflowActionBean(); 286 arr = (Object[]) ret; 287 bean.setId((String) arr[0]); 288 bean.setJobId((String) arr[1]); 289 bean.setName((String) arr[2]); 290 bean.setStatusStr((String) arr[3]); 291 bean.setPending((Integer) arr[4]); 292 bean.setPendingAge(DateUtils.toDate((Timestamp) arr[5])); 293 bean.setType((String) arr[6]); 294 bean.setLogToken((String) arr[7]); 295 bean.setTransition((String) arr[8]); 296 bean.setRetries((Integer) arr[9]); 297 bean.setUserRetryCount((Integer) arr[10]); 298 bean.setUserRetryMax((Integer) arr[11]); 299 bean.setUserRetryInterval((Integer) arr[12]); 300 bean.setTrackerUri((String) arr[13]); 301 bean.setStartTime(DateUtils.toDate((Timestamp) arr[14])); 302 bean.setEndTime(DateUtils.toDate((Timestamp) arr[15])); 303 bean.setLastCheckTime(DateUtils.toDate((Timestamp) arr[16])); 304 bean.setErrorInfo((String) arr[17], (String) arr[18]); 305 bean.setExternalId((String) arr[19]); 306 bean.setExternalStatus((String) arr[20]); 307 bean.setExternalChildIDsBlob((StringBlob) arr[21]); 308 bean.setConfBlob((StringBlob) arr[22]); 309 break; 310 case GET_ACTION_END: 311 bean = new WorkflowActionBean(); 312 arr = (Object[]) ret; 313 bean.setId((String) arr[0]); 314 bean.setJobId((String) arr[1]); 315 bean.setName((String) arr[2]); 316 bean.setStatusStr((String) arr[3]); 317 bean.setPending((Integer) arr[4]); 318 bean.setPendingAge(DateUtils.toDate((Timestamp) arr[5])); 319 bean.setType((String) arr[6]); 320 bean.setLogToken((String) arr[7]); 321 bean.setTransition((String) arr[8]); 322 bean.setRetries((Integer) arr[9]); 323 bean.setTrackerUri((String) arr[10]); 324 bean.setUserRetryCount((Integer) arr[11]); 325 bean.setUserRetryMax((Integer) arr[12]); 326 bean.setUserRetryInterval((Integer) arr[13]); 327 bean.setStartTime(DateUtils.toDate((Timestamp) arr[14])); 328 bean.setEndTime(DateUtils.toDate((Timestamp) arr[15])); 329 bean.setErrorInfo((String) arr[16], (String) arr[17]); 330 bean.setExternalId((String) arr[18]); 331 bean.setExternalStatus((String) arr[19]); 332 bean.setExternalChildIDsBlob((StringBlob) arr[20]); 333 bean.setConfBlob((StringBlob) arr[21]); 334 bean.setDataBlob((StringBlob) arr[22]); 335 bean.setStatsBlob((StringBlob) arr[23]); 336 break; 337 case GET_ACTION_COMPLETED: 338 bean = new WorkflowActionBean(); 339 arr = (Object[]) ret; 340 bean.setId((String) arr[0]); 341 bean.setJobId((String) arr[1]); 342 bean.setStatusStr((String) arr[2]); 343 bean.setType((String) arr[3]); 344 bean.setLogToken((String) arr[4]); 345 break; 346 case GET_RUNNING_ACTIONS: 347 bean = new WorkflowActionBean(); 348 bean.setId((String)ret); 349 break; 350 case GET_PENDING_ACTIONS: 351 bean = new WorkflowActionBean(); 352 arr = (Object[]) ret; 353 bean.setId((String) arr[0]); 354 bean.setJobId((String) arr[1]); 355 bean.setStatusStr((String) arr[2]); 356 bean.setType((String) arr[3]); 357 bean.setPendingAge(DateUtils.toDate((Timestamp) arr[4])); 358 break; 359 case GET_ACTIONS_FOR_WORKFLOW_RERUN: 360 bean = new WorkflowActionBean(); 361 arr = (Object[]) ret; 362 bean.setId((String) arr[0]); 363 bean.setName((String) arr[1]); 364 bean.setStatusStr((String) arr[2]); 365 bean.setEndTime(DateUtils.toDate((Timestamp) arr[3])); 366 bean.setType((String) arr[4]); 367 break; 368 case GET_ACTION_FOR_SLA: 369 bean = new WorkflowActionBean(); 370 arr = (Object[]) ret; 371 bean.setId((String) arr[0]); 372 bean.setStatusStr((String) arr[1]); 373 bean.setStartTime(DateUtils.toDate((Timestamp) arr[2])); 374 bean.setEndTime(DateUtils.toDate((Timestamp) arr[3])); 375 break; 376 377 default: 378 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for " 379 + namedQuery.name()); 380 } 381 return bean; 382 } 383 384 @Override 385 public WorkflowActionBean get(WorkflowActionQuery namedQuery, Object... parameters) throws JPAExecutorException { 386 WorkflowActionBean bean = getIfExist(namedQuery, parameters); 387 if (bean == null) { 388 throw new JPAExecutorException(ErrorCode.E0605, getSelectQuery(namedQuery, 389 Services.get().get(JPAService.class).getEntityManager(), parameters).toString()); 390 } 391 return bean; 392 } 393 394 @Override 395 public WorkflowActionBean getIfExist(WorkflowActionQuery namedQuery, Object... parameters) throws JPAExecutorException { 396 JPAService jpaService = Services.get().get(JPAService.class); 397 EntityManager em = jpaService.getEntityManager(); 398 Query query = getSelectQuery(namedQuery, em, parameters); 399 Object ret = jpaService.executeGet(namedQuery.name(), query, em); 400 if (ret == null) { 401 return null; 402 } 403 WorkflowActionBean bean = constructBean(namedQuery, ret); 404 return bean; 405 } 406 407 @Override 408 public List<WorkflowActionBean> getList(WorkflowActionQuery namedQuery, Object... parameters) 409 throws JPAExecutorException { 410 JPAService jpaService = Services.get().get(JPAService.class); 411 EntityManager em = jpaService.getEntityManager(); 412 Query query = getSelectQuery(namedQuery, em, parameters); 413 List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em); 414 List<WorkflowActionBean> beanList = new ArrayList<WorkflowActionBean>(); 415 if (retList != null) { 416 for (Object ret : retList) { 417 beanList.add(constructBean(namedQuery, ret)); 418 } 419 } 420 return beanList; 421 } 422 423 @Override 424 public Object getSingleValue(WorkflowActionQuery namedQuery, Object... parameters) throws JPAExecutorException { 425 throw new UnsupportedOperationException(); 426 } 427}