001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.executor.jpa; 019 020import java.sql.Timestamp; 021import java.util.ArrayList; 022import java.util.List; 023 024import javax.persistence.EntityManager; 025import javax.persistence.Query; 026 027import org.apache.oozie.ErrorCode; 028import org.apache.oozie.StringBlob; 029import org.apache.oozie.WorkflowActionBean; 030import org.apache.oozie.service.JPAService; 031import org.apache.oozie.service.Services; 032import org.apache.oozie.util.DateUtils; 033 034/** 035 * Query Executor that provides API to run query for Workflow Action 036 */ 037public class WorkflowActionQueryExecutor extends 038 QueryExecutor<WorkflowActionBean, WorkflowActionQueryExecutor.WorkflowActionQuery> { 039 040 public enum WorkflowActionQuery { 041 UPDATE_ACTION, 042 UPDATE_ACTION_FOR_LAST_CHECKED_TIME, 043 UPDATE_ACTION_START, 044 UPDATE_ACTION_CHECK, 045 UPDATE_ACTION_END, 046 UPDATE_ACTION_PENDING, 047 UPDATE_ACTION_STATUS_PENDING, 048 UPDATE_ACTION_PENDING_TRANS, 049 UPDATE_ACTION_PENDING_TRANS_ERROR, 050 GET_ACTION, 051 GET_ACTION_ID_TYPE_LASTCHECK, 052 GET_ACTION_FAIL, 053 GET_ACTION_SIGNAL, 054 GET_ACTION_CHECK, 055 GET_ACTION_END, 056 GET_ACTION_COMPLETED, 057 GET_RUNNING_ACTIONS, 058 GET_PENDING_ACTIONS, 059 GET_ACTIONS_FOR_WORKFLOW_RERUN 060 }; 061 062 private static WorkflowActionQueryExecutor instance = new WorkflowActionQueryExecutor(); 063 064 private WorkflowActionQueryExecutor() { 065 } 066 067 public static QueryExecutor<WorkflowActionBean, WorkflowActionQuery> getInstance() { 068 return WorkflowActionQueryExecutor.instance; 069 } 070 071 @Override 072 public Query getUpdateQuery(WorkflowActionQuery namedQuery, WorkflowActionBean actionBean, EntityManager em) 073 throws JPAExecutorException { 074 Query query = em.createNamedQuery(namedQuery.name()); 075 switch (namedQuery) { 076 case UPDATE_ACTION: 077 query.setParameter("conf", actionBean.getConfBlob()); 078 query.setParameter("consoleUrl", actionBean.getConsoleUrl()); 079 query.setParameter("data", actionBean.getDataBlob()); 080 query.setParameter("stats", actionBean.getStatsBlob()); 081 query.setParameter("externalChildIDs", actionBean.getExternalChildIDsBlob()); 082 query.setParameter("errorCode", actionBean.getErrorCode()); 083 query.setParameter("errorMessage", actionBean.getErrorMessage()); 084 query.setParameter("externalId", actionBean.getExternalId()); 085 query.setParameter("externalStatus", actionBean.getExternalStatus()); 086 query.setParameter("name", actionBean.getName()); 087 query.setParameter("cred", actionBean.getCred()); 088 query.setParameter("retries", actionBean.getRetries()); 089 query.setParameter("trackerUri", actionBean.getTrackerUri()); 090 query.setParameter("transition", actionBean.getTransition()); 091 query.setParameter("type", actionBean.getType()); 092 query.setParameter("endTime", actionBean.getEndTimestamp()); 093 query.setParameter("executionPath", actionBean.getExecutionPath()); 094 query.setParameter("lastCheckTime", actionBean.getLastCheckTimestamp()); 095 query.setParameter("logToken", actionBean.getLogToken()); 096 query.setParameter("pending", actionBean.getPending()); 097 query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp()); 098 query.setParameter("signalValue", actionBean.getSignalValue()); 099 query.setParameter("slaXml", actionBean.getSlaXmlBlob()); 100 query.setParameter("startTime", actionBean.getStartTimestamp()); 101 query.setParameter("status", actionBean.getStatusStr()); 102 query.setParameter("wfId", actionBean.getWfId()); 103 query.setParameter("id", actionBean.getId()); 104 break; 105 case UPDATE_ACTION_FOR_LAST_CHECKED_TIME: 106 query.setParameter("lastCheckTime", actionBean.getLastCheckTimestamp()); 107 query.setParameter("id", actionBean.getId()); 108 break; 109 case UPDATE_ACTION_PENDING: 110 query.setParameter("pending", actionBean.getPending()); 111 query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp()); 112 query.setParameter("id", actionBean.getId()); 113 break; 114 case UPDATE_ACTION_STATUS_PENDING: 115 query.setParameter("status", actionBean.getStatus().toString()); 116 query.setParameter("pending", actionBean.getPending()); 117 query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp()); 118 query.setParameter("id", actionBean.getId()); 119 break; 120 case UPDATE_ACTION_PENDING_TRANS: 121 query.setParameter("transition", actionBean.getTransition()); 122 query.setParameter("pending", actionBean.getPending()); 123 query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp()); 124 query.setParameter("id", actionBean.getId()); 125 break; 126 case UPDATE_ACTION_PENDING_TRANS_ERROR: 127 query.setParameter("transition", actionBean.getTransition()); 128 query.setParameter("pending", actionBean.getPending()); 129 query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp()); 130 query.setParameter("errorCode", actionBean.getErrorCode()); 131 query.setParameter("errorMessage", actionBean.getErrorMessage()); 132 query.setParameter("id", actionBean.getId()); 133 break; 134 case UPDATE_ACTION_START: 135 query.setParameter("startTime", actionBean.getStartTimestamp()); 136 query.setParameter("externalChildIDs", actionBean.getExternalChildIDsBlob()); 137 query.setParameter("conf", actionBean.getConfBlob()); 138 query.setParameter("errorCode", actionBean.getErrorCode()); 139 query.setParameter("errorMessage", actionBean.getErrorMessage()); 140 query.setParameter("externalId", actionBean.getExternalId()); 141 query.setParameter("trackerUri", actionBean.getTrackerUri()); 142 query.setParameter("consoleUrl", actionBean.getConsoleUrl()); 143 query.setParameter("lastCheckTime", actionBean.getLastCheckTimestamp()); 144 query.setParameter("status", actionBean.getStatus().toString()); 145 query.setParameter("externalStatus", actionBean.getExternalStatus()); 146 query.setParameter("data", actionBean.getDataBlob()); 147 query.setParameter("retries", actionBean.getRetries()); 148 query.setParameter("pending", actionBean.getPending()); 149 query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp()); 150 query.setParameter("userRetryCount", actionBean.getUserRetryCount()); 151 query.setParameter("id", actionBean.getId()); 152 break; 153 case UPDATE_ACTION_CHECK: 154 query.setParameter("externalChildIDs", actionBean.getExternalChildIDsBlob()); 155 query.setParameter("externalStatus", actionBean.getExternalStatus()); 156 query.setParameter("status", actionBean.getStatus().toString()); 157 query.setParameter("data", actionBean.getDataBlob()); 158 query.setParameter("pending", actionBean.getPending()); 159 query.setParameter("errorCode", actionBean.getErrorCode()); 160 query.setParameter("errorMessage", actionBean.getErrorMessage()); 161 query.setParameter("lastCheckTime", actionBean.getLastCheckTimestamp()); 162 query.setParameter("retries", actionBean.getRetries()); 163 query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp()); 164 query.setParameter("startTime", actionBean.getStartTimestamp()); 165 query.setParameter("stats", actionBean.getStatsBlob()); 166 query.setParameter("userRetryCount", actionBean.getUserRetryCount()); 167 query.setParameter("id", actionBean.getId()); 168 break; 169 case UPDATE_ACTION_END: 170 query.setParameter("errorCode", actionBean.getErrorCode()); 171 query.setParameter("errorMessage", actionBean.getErrorMessage()); 172 query.setParameter("retries", actionBean.getRetries()); 173 query.setParameter("status", actionBean.getStatus().toString()); 174 query.setParameter("endTime", actionBean.getEndTimestamp()); 175 query.setParameter("pending", actionBean.getPending()); 176 query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp()); 177 query.setParameter("signalValue", actionBean.getSignalValue()); 178 query.setParameter("userRetryCount", actionBean.getUserRetryCount()); 179 query.setParameter("externalStatus", actionBean.getExternalStatus()); 180 query.setParameter("stats", actionBean.getStatsBlob()); 181 query.setParameter("id", actionBean.getId()); 182 break; 183 default: 184 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 185 + namedQuery.name()); 186 } 187 return query; 188 } 189 190 @Override 191 public Query getSelectQuery(WorkflowActionQuery namedQuery, EntityManager em, Object... parameters) 192 throws JPAExecutorException { 193 Query query = em.createNamedQuery(namedQuery.name()); 194 switch (namedQuery) { 195 case GET_ACTION: 196 case GET_ACTION_ID_TYPE_LASTCHECK: 197 case GET_ACTION_FAIL: 198 case GET_ACTION_SIGNAL: 199 case GET_ACTION_CHECK: 200 case GET_ACTION_END: 201 case GET_ACTION_COMPLETED: 202 query.setParameter("id", parameters[0]); 203 break; 204 case GET_RUNNING_ACTIONS: 205 Timestamp ts = new Timestamp(System.currentTimeMillis() - (Integer) parameters[0] * 1000); 206 query.setParameter("lastCheckTime", ts); 207 break; 208 case GET_PENDING_ACTIONS: 209 Long minimumPendingAgeSecs = (Long) parameters[0]; 210 Timestamp pts = new Timestamp(System.currentTimeMillis() - minimumPendingAgeSecs * 1000); 211 query.setParameter("pendingAge", pts); 212 break; 213 case GET_ACTIONS_FOR_WORKFLOW_RERUN: 214 query.setParameter("wfId", parameters[0]); 215 break; 216 default: 217 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 218 + namedQuery.name()); 219 } 220 return query; 221 } 222 223 @Override 224 public int executeUpdate(WorkflowActionQuery namedQuery, WorkflowActionBean actionBean) throws JPAExecutorException { 225 JPAService jpaService = Services.get().get(JPAService.class); 226 EntityManager em = jpaService.getEntityManager(); 227 Query query = getUpdateQuery(namedQuery, actionBean, em); 228 int ret = jpaService.executeUpdate(namedQuery.name(), query, em); 229 return ret; 230 } 231 232 private WorkflowActionBean constructBean(WorkflowActionQuery namedQuery, Object ret) throws JPAExecutorException { 233 WorkflowActionBean bean; 234 Object[] arr; 235 switch (namedQuery) { 236 case GET_ACTION: 237 bean = (WorkflowActionBean) ret; 238 break; 239 case GET_ACTION_ID_TYPE_LASTCHECK: 240 bean = new WorkflowActionBean(); 241 arr = (Object[]) ret; 242 bean.setId((String) arr[0]); 243 bean.setType((String) arr[1]); 244 bean.setLastCheckTime(DateUtils.toDate((Timestamp) arr[2])); 245 break; 246 case GET_ACTION_FAIL: 247 bean = new WorkflowActionBean(); 248 arr = (Object[]) ret; 249 bean.setId((String) arr[0]); 250 bean.setJobId((String) arr[1]); 251 bean.setName((String) arr[2]); 252 bean.setStatusStr((String) arr[3]); 253 bean.setPending((Integer) arr[4]); 254 bean.setType((String) arr[5]); 255 bean.setLogToken((String) arr[6]); 256 bean.setTransition((String) arr[7]); 257 bean.setErrorInfo((String) arr[8], (String) arr[9]); 258 break; 259 case GET_ACTION_SIGNAL: 260 bean = new WorkflowActionBean(); 261 arr = (Object[]) ret; 262 bean.setId((String) arr[0]); 263 bean.setJobId((String) arr[1]); 264 bean.setName((String) arr[2]); 265 bean.setStatusStr((String) arr[3]); 266 bean.setPending((Integer) arr[4]); 267 bean.setPendingAge(DateUtils.toDate((Timestamp) arr[5])); 268 bean.setType((String) arr[6]); 269 bean.setLogToken((String) arr[7]); 270 bean.setTransition((String) arr[8]); 271 bean.setErrorInfo((String) arr[9], (String) arr[10]); 272 bean.setExecutionPath((String) arr[11]); 273 bean.setSignalValue((String) arr[12]); 274 bean.setSlaXmlBlob((StringBlob) arr[13]); 275 break; 276 case GET_ACTION_CHECK: 277 bean = new WorkflowActionBean(); 278 arr = (Object[]) ret; 279 bean.setId((String) arr[0]); 280 bean.setJobId((String) arr[1]); 281 bean.setName((String) arr[2]); 282 bean.setStatusStr((String) arr[3]); 283 bean.setPending((Integer) arr[4]); 284 bean.setPendingAge(DateUtils.toDate((Timestamp) arr[5])); 285 bean.setType((String) arr[6]); 286 bean.setLogToken((String) arr[7]); 287 bean.setTransition((String) arr[8]); 288 bean.setRetries((Integer) arr[9]); 289 bean.setUserRetryCount((Integer) arr[10]); 290 bean.setUserRetryMax((Integer) arr[11]); 291 bean.setUserRetryInterval((Integer) arr[12]); 292 bean.setTrackerUri((String) arr[13]); 293 bean.setStartTime(DateUtils.toDate((Timestamp) arr[14])); 294 bean.setEndTime(DateUtils.toDate((Timestamp) arr[15])); 295 bean.setLastCheckTime(DateUtils.toDate((Timestamp) arr[16])); 296 bean.setErrorInfo((String) arr[17], (String) arr[18]); 297 bean.setExternalId((String) arr[19]); 298 bean.setExternalStatus((String) arr[20]); 299 bean.setExternalChildIDsBlob((StringBlob) arr[21]); 300 bean.setConfBlob((StringBlob) arr[22]); 301 break; 302 case GET_ACTION_END: 303 bean = new WorkflowActionBean(); 304 arr = (Object[]) ret; 305 bean.setId((String) arr[0]); 306 bean.setJobId((String) arr[1]); 307 bean.setName((String) arr[2]); 308 bean.setStatusStr((String) arr[3]); 309 bean.setPending((Integer) arr[4]); 310 bean.setPendingAge(DateUtils.toDate((Timestamp) arr[5])); 311 bean.setType((String) arr[6]); 312 bean.setLogToken((String) arr[7]); 313 bean.setTransition((String) arr[8]); 314 bean.setRetries((Integer) arr[9]); 315 bean.setTrackerUri((String) arr[10]); 316 bean.setUserRetryCount((Integer) arr[11]); 317 bean.setUserRetryMax((Integer) arr[12]); 318 bean.setUserRetryInterval((Integer) arr[13]); 319 bean.setStartTime(DateUtils.toDate((Timestamp) arr[14])); 320 bean.setEndTime(DateUtils.toDate((Timestamp) arr[15])); 321 bean.setErrorInfo((String) arr[16], (String) arr[17]); 322 bean.setExternalId((String) arr[18]); 323 bean.setExternalStatus((String) arr[19]); 324 bean.setExternalChildIDsBlob((StringBlob) arr[20]); 325 bean.setConfBlob((StringBlob) arr[21]); 326 bean.setDataBlob((StringBlob) arr[22]); 327 bean.setStatsBlob((StringBlob) arr[23]); 328 break; 329 case GET_ACTION_COMPLETED: 330 bean = new WorkflowActionBean(); 331 arr = (Object[]) ret; 332 bean.setId((String) arr[0]); 333 bean.setJobId((String) arr[1]); 334 bean.setStatusStr((String) arr[2]); 335 bean.setType((String) arr[3]); 336 bean.setLogToken((String) arr[4]); 337 break; 338 case GET_RUNNING_ACTIONS: 339 bean = new WorkflowActionBean(); 340 bean.setId((String)ret); 341 break; 342 case GET_PENDING_ACTIONS: 343 bean = new WorkflowActionBean(); 344 arr = (Object[]) ret; 345 bean.setId((String) arr[0]); 346 bean.setJobId((String) arr[1]); 347 bean.setStatusStr((String) arr[2]); 348 bean.setType((String) arr[3]); 349 bean.setPendingAge(DateUtils.toDate((Timestamp) arr[4])); 350 break; 351 case GET_ACTIONS_FOR_WORKFLOW_RERUN: 352 bean = new WorkflowActionBean(); 353 arr = (Object[]) ret; 354 bean.setId((String) arr[0]); 355 bean.setName((String) arr[1]); 356 bean.setStatusStr((String) arr[2]); 357 bean.setEndTime(DateUtils.toDate((Timestamp) arr[3])); 358 break; 359 default: 360 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for " 361 + namedQuery.name()); 362 } 363 return bean; 364 } 365 366 @Override 367 public WorkflowActionBean get(WorkflowActionQuery namedQuery, Object... parameters) throws JPAExecutorException { 368 JPAService jpaService = Services.get().get(JPAService.class); 369 EntityManager em = jpaService.getEntityManager(); 370 Query query = getSelectQuery(namedQuery, em, parameters); 371 Object ret = jpaService.executeGet(namedQuery.name(), query, em); 372 if (ret == null) { 373 throw new JPAExecutorException(ErrorCode.E0605, query.toString()); 374 } 375 WorkflowActionBean bean = constructBean(namedQuery, ret); 376 return bean; 377 } 378 379 @Override 380 public List<WorkflowActionBean> getList(WorkflowActionQuery namedQuery, Object... parameters) 381 throws JPAExecutorException { 382 JPAService jpaService = Services.get().get(JPAService.class); 383 EntityManager em = jpaService.getEntityManager(); 384 Query query = getSelectQuery(namedQuery, em, parameters); 385 List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em); 386 List<WorkflowActionBean> beanList = new ArrayList<WorkflowActionBean>(); 387 if (retList != null) { 388 for (Object ret : retList) { 389 beanList.add(constructBean(namedQuery, ret)); 390 } 391 } 392 return beanList; 393 } 394 395 @Override 396 public Object getSingleValue(WorkflowActionQuery namedQuery, Object... parameters) throws JPAExecutorException { 397 throw new UnsupportedOperationException(); 398 } 399}