001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.executor.jpa; 020 021import java.sql.Timestamp; 022import java.util.ArrayList; 023import java.util.List; 024 025import javax.persistence.EntityManager; 026import javax.persistence.Query; 027 028import org.apache.oozie.BinaryBlob; 029import org.apache.oozie.ErrorCode; 030import org.apache.oozie.StringBlob; 031import org.apache.oozie.WorkflowJobBean; 032import org.apache.oozie.service.JPAService; 033import org.apache.oozie.service.Services; 034import org.apache.oozie.util.DateUtils; 035 036/** 037 * Query Executor that provides API to run query for Workflow Job 038 */ 039public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, WorkflowJobQueryExecutor.WorkflowJobQuery> { 040 041 public enum WorkflowJobQuery { 042 UPDATE_WORKFLOW, 043 UPDATE_WORKFLOW_MODTIME, 044 UPDATE_WORKFLOW_STATUS_MODTIME, 045 UPDATE_WORKFLOW_PARENT_MODIFIED, 046 UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, 047 UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END, 048 UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, 049 UPDATE_WORKFLOW_RERUN, 050 GET_WORKFLOW, 051 GET_WORKFLOW_STARTTIME, 052 GET_WORKFLOW_START_END_TIME, 053 GET_WORKFLOW_USER_GROUP, 054 GET_WORKFLOW_SUSPEND, 055 GET_WORKFLOW_ACTION_OP, 056 GET_WORKFLOW_RERUN, 057 GET_WORKFLOW_DEFINITION, 058 GET_WORKFLOW_KILL, 059 GET_WORKFLOW_RESUME, 060 GET_WORKFLOW_STATUS, 061 GET_WORKFLOWS_PARENT_COORD_RERUN, 062 GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN, 063 GET_WORKFLOW_FOR_SLA 064 }; 065 066 private static WorkflowJobQueryExecutor instance = new WorkflowJobQueryExecutor(); 067 068 private WorkflowJobQueryExecutor() { 069 } 070 071 public static QueryExecutor<WorkflowJobBean, WorkflowJobQueryExecutor.WorkflowJobQuery> getInstance() { 072 return WorkflowJobQueryExecutor.instance; 073 } 074 075 @Override 076 public Query getUpdateQuery(WorkflowJobQuery namedQuery, WorkflowJobBean wfBean, EntityManager em) 077 throws JPAExecutorException { 078 079 Query query = em.createNamedQuery(namedQuery.name()); 080 switch (namedQuery) { 081 case UPDATE_WORKFLOW: 082 query.setParameter("appName", wfBean.getAppName()); 083 query.setParameter("appPath", wfBean.getAppPath()); 084 query.setParameter("conf", wfBean.getConfBlob()); 085 query.setParameter("groupName", wfBean.getGroup()); 086 query.setParameter("run", wfBean.getRun()); 087 query.setParameter("user", wfBean.getUser()); 088 query.setParameter("createdTime", wfBean.getCreatedTimestamp()); 089 query.setParameter("endTime", wfBean.getEndTimestamp()); 090 query.setParameter("externalId", wfBean.getExternalId()); 091 query.setParameter("lastModTime", wfBean.getLastModifiedTimestamp()); 092 query.setParameter("logToken", wfBean.getLogToken()); 093 query.setParameter("protoActionConf", wfBean.getProtoActionConfBlob()); 094 query.setParameter("slaXml", wfBean.getSlaXmlBlob()); 095 query.setParameter("startTime", wfBean.getStartTimestamp()); 096 query.setParameter("status", wfBean.getStatusStr()); 097 query.setParameter("wfInstance", wfBean.getWfInstanceBlob()); 098 query.setParameter("id", wfBean.getId()); 099 break; 100 case UPDATE_WORKFLOW_MODTIME: 101 query.setParameter("lastModTime", wfBean.getLastModifiedTimestamp()); 102 query.setParameter("id", wfBean.getId()); 103 break; 104 case UPDATE_WORKFLOW_STATUS_MODTIME: 105 query.setParameter("status", wfBean.getStatus().toString()); 106 query.setParameter("lastModTime", wfBean.getLastModifiedTimestamp()); 107 query.setParameter("id", wfBean.getId()); 108 break; 109 case UPDATE_WORKFLOW_PARENT_MODIFIED: 110 query.setParameter("parentId", wfBean.getParentId()); 111 query.setParameter("lastModTime", wfBean.getLastModifiedTimestamp()); 112 query.setParameter("id", wfBean.getId()); 113 break; 114 case UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED: 115 query.setParameter("status", wfBean.getStatus().toString()); 116 query.setParameter("wfInstance", wfBean.getWfInstanceBlob()); 117 query.setParameter("lastModTime", wfBean.getLastModifiedTimestamp()); 118 query.setParameter("id", wfBean.getId()); 119 break; 120 case UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END: 121 query.setParameter("status", wfBean.getStatus().toString()); 122 query.setParameter("wfInstance", wfBean.getWfInstanceBlob()); 123 query.setParameter("lastModTime", wfBean.getLastModifiedTimestamp()); 124 query.setParameter("endTime", wfBean.getEndTimestamp()); 125 query.setParameter("id", wfBean.getId()); 126 break; 127 case UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END: 128 query.setParameter("status", wfBean.getStatus().toString()); 129 query.setParameter("wfInstance", wfBean.getWfInstanceBlob()); 130 query.setParameter("lastModTime", wfBean.getLastModifiedTimestamp()); 131 query.setParameter("startTime", wfBean.getStartTimestamp()); 132 query.setParameter("endTime", wfBean.getEndTimestamp()); 133 query.setParameter("id", wfBean.getId()); 134 break; 135 case UPDATE_WORKFLOW_RERUN: 136 query.setParameter("appName", wfBean.getAppName()); 137 query.setParameter("protoActionConf", wfBean.getProtoActionConfBlob()); 138 query.setParameter("appPath", wfBean.getAppPath()); 139 query.setParameter("conf", wfBean.getConfBlob()); 140 query.setParameter("logToken", wfBean.getLogToken()); 141 query.setParameter("user", wfBean.getUser()); 142 query.setParameter("group", wfBean.getGroup()); 143 query.setParameter("externalId", wfBean.getExternalId()); 144 query.setParameter("endTime", wfBean.getEndTimestamp()); 145 query.setParameter("run", wfBean.getRun()); 146 query.setParameter("status", wfBean.getStatus().toString()); 147 query.setParameter("wfInstance", wfBean.getWfInstanceBlob()); 148 query.setParameter("lastModTime", wfBean.getLastModifiedTimestamp()); 149 query.setParameter("id", wfBean.getId()); 150 break; 151 default: 152 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 153 + namedQuery.name()); 154 } 155 return query; 156 } 157 158 @Override 159 public Query getSelectQuery(WorkflowJobQuery namedQuery, EntityManager em, Object... parameters) 160 throws JPAExecutorException { 161 Query query = em.createNamedQuery(namedQuery.name()); 162 switch (namedQuery) { 163 case GET_WORKFLOW: 164 case GET_WORKFLOW_STARTTIME: 165 case GET_WORKFLOW_START_END_TIME: 166 case GET_WORKFLOW_USER_GROUP: 167 case GET_WORKFLOW_SUSPEND: 168 case GET_WORKFLOW_ACTION_OP: 169 case GET_WORKFLOW_RERUN: 170 case GET_WORKFLOW_DEFINITION: 171 case GET_WORKFLOW_KILL: 172 case GET_WORKFLOW_RESUME: 173 case GET_WORKFLOW_STATUS: 174 case GET_WORKFLOW_FOR_SLA: 175 query.setParameter("id", parameters[0]); 176 break; 177 case GET_WORKFLOWS_PARENT_COORD_RERUN: 178 query.setParameter("parentId", parameters[0]); 179 break; 180 case GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN: 181 long dayInMs = 24 * 60 * 60 * 1000; 182 long olderThanDays = (Long) parameters[0]; 183 Timestamp maxEndtime = new Timestamp(System.currentTimeMillis() - (olderThanDays * dayInMs)); 184 query.setParameter("endTime", maxEndtime); 185 query.setFirstResult((Integer) parameters[1]); 186 query.setMaxResults((Integer) parameters[2]); 187 break; 188 default: 189 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 190 + namedQuery.name()); 191 } 192 return query; 193 } 194 195 @Override 196 public int executeUpdate(WorkflowJobQuery namedQuery, WorkflowJobBean jobBean) throws JPAExecutorException { 197 JPAService jpaService = Services.get().get(JPAService.class); 198 EntityManager em = jpaService.getEntityManager(); 199 Query query = getUpdateQuery(namedQuery, jobBean, em); 200 int ret = jpaService.executeUpdate(namedQuery.name(), query, em); 201 return ret; 202 } 203 204 private WorkflowJobBean constructBean(WorkflowJobQuery namedQuery, Object ret, Object... parameters) 205 throws JPAExecutorException { 206 WorkflowJobBean bean; 207 Object[] arr; 208 switch (namedQuery) { 209 case GET_WORKFLOW: 210 bean = (WorkflowJobBean) ret; 211 break; 212 case GET_WORKFLOW_STARTTIME: 213 bean = new WorkflowJobBean(); 214 arr = (Object[]) ret; 215 bean.setId((String) arr[0]); 216 bean.setStartTime(DateUtils.toDate((Timestamp) arr[1])); 217 break; 218 case GET_WORKFLOW_START_END_TIME: 219 bean = new WorkflowJobBean(); 220 arr = (Object[]) ret; 221 bean.setId((String) arr[0]); 222 bean.setStartTime(DateUtils.toDate((Timestamp) arr[1])); 223 bean.setEndTime(DateUtils.toDate((Timestamp) arr[2])); 224 break; 225 case GET_WORKFLOW_USER_GROUP: 226 bean = new WorkflowJobBean(); 227 arr = (Object[]) ret; 228 bean.setUser((String) arr[0]); 229 bean.setGroup((String) arr[1]); 230 break; 231 case GET_WORKFLOW_SUSPEND: 232 bean = new WorkflowJobBean(); 233 arr = (Object[]) ret; 234 bean.setId((String) arr[0]); 235 bean.setUser((String) arr[1]); 236 bean.setGroup((String) arr[2]); 237 bean.setAppName((String) arr[3]); 238 bean.setStatusStr((String) arr[4]); 239 bean.setParentId((String) arr[5]); 240 bean.setStartTime(DateUtils.toDate((Timestamp) arr[6])); 241 bean.setEndTime(DateUtils.toDate((Timestamp) arr[7])); 242 bean.setLogToken((String) arr[8]); 243 bean.setWfInstanceBlob((BinaryBlob) (arr[9])); 244 break; 245 case GET_WORKFLOW_ACTION_OP: 246 bean = new WorkflowJobBean(); 247 arr = (Object[]) ret; 248 bean.setId((String) arr[0]); 249 bean.setUser((String) arr[1]); 250 bean.setGroup((String) arr[2]); 251 bean.setAppName((String) arr[3]); 252 bean.setAppPath((String) arr[4]); 253 bean.setStatusStr((String) arr[5]); 254 bean.setRun((Integer) arr[6]); 255 bean.setParentId((String) arr[7]); 256 bean.setLogToken((String) arr[8]); 257 bean.setWfInstanceBlob((BinaryBlob) (arr[9])); 258 bean.setProtoActionConfBlob((StringBlob) arr[10]); 259 break; 260 case GET_WORKFLOW_RERUN: 261 bean = new WorkflowJobBean(); 262 arr = (Object[]) ret; 263 bean.setId((String) arr[0]); 264 bean.setUser((String) arr[1]); 265 bean.setGroup((String) arr[2]); 266 bean.setAppName((String) arr[3]); 267 bean.setStatusStr((String) arr[4]); 268 bean.setRun((Integer) arr[5]); 269 bean.setLogToken((String) arr[6]); 270 bean.setWfInstanceBlob((BinaryBlob) (arr[7])); 271 bean.setParentId((String)arr[8]); 272 break; 273 case GET_WORKFLOW_DEFINITION: 274 bean = new WorkflowJobBean(); 275 arr = (Object[]) ret; 276 bean.setId((String) arr[0]); 277 bean.setUser((String) arr[1]); 278 bean.setGroup((String) arr[2]); 279 bean.setAppName((String) arr[3]); 280 bean.setLogToken((String) arr[4]); 281 bean.setWfInstanceBlob((BinaryBlob) (arr[5])); 282 break; 283 case GET_WORKFLOW_KILL: 284 bean = new WorkflowJobBean(); 285 arr = (Object[]) ret; 286 bean.setId((String) arr[0]); 287 bean.setUser((String) arr[1]); 288 bean.setGroup((String) arr[2]); 289 bean.setAppName((String) arr[3]); 290 bean.setAppPath((String) arr[4]); 291 bean.setStatusStr((String) arr[5]); 292 bean.setParentId((String) arr[6]); 293 bean.setStartTime(DateUtils.toDate((Timestamp) arr[7])); 294 bean.setEndTime(DateUtils.toDate((Timestamp) arr[8])); 295 bean.setLogToken((String) arr[9]); 296 bean.setWfInstanceBlob((BinaryBlob) (arr[10])); 297 bean.setSlaXmlBlob((StringBlob) arr[11]); 298 bean.setProtoActionConfBlob((StringBlob) arr[12]); 299 break; 300 case GET_WORKFLOW_RESUME: 301 bean = new WorkflowJobBean(); 302 arr = (Object[]) ret; 303 bean.setId((String) arr[0]); 304 bean.setUser((String) arr[1]); 305 bean.setGroup((String) arr[2]); 306 bean.setAppName((String) arr[3]); 307 bean.setAppPath((String) arr[4]); 308 bean.setStatusStr((String) arr[5]); 309 bean.setParentId((String) arr[6]); 310 bean.setStartTime(DateUtils.toDate((Timestamp) arr[7])); 311 bean.setEndTime(DateUtils.toDate((Timestamp) arr[8])); 312 bean.setLogToken((String) arr[9]); 313 bean.setWfInstanceBlob((BinaryBlob) (arr[10])); 314 bean.setProtoActionConfBlob((StringBlob) arr[11]); 315 break; 316 case GET_WORKFLOW_STATUS: 317 bean = new WorkflowJobBean(); 318 bean.setId((String) parameters[0]); 319 bean.setStatusStr((String) ret); 320 break; 321 case GET_WORKFLOWS_PARENT_COORD_RERUN: 322 bean = new WorkflowJobBean(); 323 arr = (Object[]) ret; 324 bean.setId((String) arr[0]); 325 bean.setStatusStr((String) arr[1]); 326 bean.setStartTime(DateUtils.toDate((Timestamp) arr[2])); 327 bean.setEndTime(DateUtils.toDate((Timestamp) arr[3])); 328 break; 329 case GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN: 330 bean = new WorkflowJobBean(); 331 arr = (Object[]) ret; 332 bean.setId((String) arr[0]); 333 bean.setParentId((String) arr[1]); 334 break; 335 case GET_WORKFLOW_FOR_SLA: 336 bean = new WorkflowJobBean(); 337 arr = (Object[]) ret; 338 bean.setId((String) arr[0]); 339 bean.setStatusStr((String) arr[1]); 340 bean.setStartTime(DateUtils.toDate((Timestamp) arr[2])); 341 bean.setEndTime(DateUtils.toDate((Timestamp) arr[3])); 342 break; 343 default: 344 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for " 345 + namedQuery.name()); 346 } 347 return bean; 348 } 349 350 @Override 351 public WorkflowJobBean get(WorkflowJobQuery namedQuery, Object... parameters) throws JPAExecutorException { 352 WorkflowJobBean bean = getIfExist(namedQuery, parameters); 353 if (bean == null) { 354 throw new JPAExecutorException(ErrorCode.E0605, getSelectQuery(namedQuery, 355 Services.get().get(JPAService.class).getEntityManager(), parameters).toString()); 356 } 357 return bean; 358 } 359 360 @Override 361 public WorkflowJobBean getIfExist(WorkflowJobQuery namedQuery, Object... parameters) throws JPAExecutorException { 362 JPAService jpaService = Services.get().get(JPAService.class); 363 EntityManager em = jpaService.getEntityManager(); 364 Query query = getSelectQuery(namedQuery, em, parameters); 365 Object ret = jpaService.executeGet(namedQuery.name(), query, em); 366 if (ret == null) { 367 return null; 368 } 369 WorkflowJobBean bean = constructBean(namedQuery, ret, parameters); 370 return bean; 371 } 372 373 @Override 374 public List<WorkflowJobBean> getList(WorkflowJobQuery namedQuery, Object... parameters) throws JPAExecutorException { 375 JPAService jpaService = Services.get().get(JPAService.class); 376 EntityManager em = jpaService.getEntityManager(); 377 Query query = getSelectQuery(namedQuery, em, parameters); 378 List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em); 379 List<WorkflowJobBean> beanList = new ArrayList<WorkflowJobBean>(); 380 if (retList != null) { 381 for (Object ret : retList) { 382 beanList.add(constructBean(namedQuery, ret, parameters)); 383 } 384 } 385 return beanList; 386 } 387 388 @Override 389 public Object getSingleValue(WorkflowJobQuery namedQuery, Object... parameters) throws JPAExecutorException { 390 throw new UnsupportedOperationException(); 391 } 392}