001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.executor.jpa; 019 020import java.sql.Timestamp; 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.List; 024 025import javax.persistence.EntityManager; 026import javax.persistence.Query; 027 028import org.apache.oozie.CoordinatorActionBean; 029import org.apache.oozie.ErrorCode; 030import org.apache.oozie.service.JPAService; 031import org.apache.oozie.service.Services; 032 033/** 034 * Query Executor that provides API to run query for Coordinator Action 035 */ 036public class CoordActionQueryExecutor extends 037 QueryExecutor<CoordinatorActionBean, CoordActionQueryExecutor.CoordActionQuery> { 038 039 public enum CoordActionQuery { 040 UPDATE_COORD_ACTION, 041 UPDATE_COORD_ACTION_STATUS_PENDING_TIME, 042 UPDATE_COORD_ACTION_FOR_INPUTCHECK, 043 UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK, 044 UPDATE_COORD_ACTION_DEPENDENCIES, 045 UPDATE_COORD_ACTION_FOR_START, 046 UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, 047 UPDATE_COORD_ACTION_RERUN, 048 GET_COORD_ACTION, 049 GET_COORD_ACTION_STATUS, 050 GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID, 051 GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME 052 }; 053 054 private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor(); 055 056 private CoordActionQueryExecutor() { 057 } 058 059 public static QueryExecutor<CoordinatorActionBean, CoordActionQueryExecutor.CoordActionQuery> getInstance() { 060 return CoordActionQueryExecutor.instance; 061 } 062 063 @Override 064 public Query getUpdateQuery(CoordActionQuery namedQuery, CoordinatorActionBean actionBean, EntityManager em) 065 throws JPAExecutorException { 066 067 Query query = em.createNamedQuery(namedQuery.name()); 068 switch (namedQuery) { 069 case UPDATE_COORD_ACTION: 070 query.setParameter("actionNumber", actionBean.getActionNumber()); 071 query.setParameter("actionXml", actionBean.getActionXmlBlob()); 072 query.setParameter("consoleUrl", actionBean.getConsoleUrl()); 073 query.setParameter("createdConf", actionBean.getCreatedConfBlob()); 074 query.setParameter("errorCode", actionBean.getErrorCode()); 075 query.setParameter("errorMessage", actionBean.getErrorMessage()); 076 query.setParameter("externalStatus", actionBean.getExternalStatus()); 077 query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob()); 078 query.setParameter("runConf", actionBean.getRunConfBlob()); 079 query.setParameter("timeOut", actionBean.getTimeOut()); 080 query.setParameter("trackerUri", actionBean.getTrackerUri()); 081 query.setParameter("type", actionBean.getType()); 082 query.setParameter("createdTime", actionBean.getCreatedTimestamp()); 083 query.setParameter("externalId", actionBean.getExternalId()); 084 query.setParameter("jobId", actionBean.getJobId()); 085 query.setParameter("lastModifiedTime", new Date()); 086 query.setParameter("nominalTime", actionBean.getNominalTimestamp()); 087 query.setParameter("slaXml", actionBean.getSlaXmlBlob()); 088 query.setParameter("status", actionBean.getStatus().toString()); 089 query.setParameter("id", actionBean.getId()); 090 break; 091 092 case UPDATE_COORD_ACTION_STATUS_PENDING_TIME: 093 query.setParameter("status", actionBean.getStatus().toString()); 094 query.setParameter("pending", actionBean.getPending()); 095 query.setParameter("lastModifiedTime", new Date()); 096 query.setParameter("id", actionBean.getId()); 097 break; 098 099 case UPDATE_COORD_ACTION_FOR_INPUTCHECK: 100 query.setParameter("status", actionBean.getStatus().toString()); 101 query.setParameter("lastModifiedTime", new Date()); 102 query.setParameter("actionXml", actionBean.getActionXmlBlob()); 103 query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob()); 104 query.setParameter("id", actionBean.getId()); 105 break; 106 107 case UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK: 108 query.setParameter("status", actionBean.getStatus().toString()); 109 query.setParameter("lastModifiedTime", new Date()); 110 query.setParameter("actionXml", actionBean.getActionXmlBlob()); 111 query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob()); 112 query.setParameter("id", actionBean.getId()); 113 break; 114 115 case UPDATE_COORD_ACTION_DEPENDENCIES: 116 query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob()); 117 query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob()); 118 query.setParameter("id", actionBean.getId()); 119 break; 120 121 case UPDATE_COORD_ACTION_FOR_START: 122 query.setParameter("status", actionBean.getStatus().toString()); 123 query.setParameter("lastModifiedTime", new Date()); 124 query.setParameter("runConf", actionBean.getRunConfBlob()); 125 query.setParameter("externalId", actionBean.getExternalId()); 126 query.setParameter("pending", actionBean.getPending()); 127 query.setParameter("errorCode", actionBean.getErrorCode()); 128 query.setParameter("errorMessage", actionBean.getErrorMessage()); 129 query.setParameter("id", actionBean.getId()); 130 break; 131 132 case UPDATE_COORD_ACTION_FOR_MODIFIED_DATE: 133 query.setParameter("lastModifiedTime", actionBean.getLastModifiedTimestamp()); 134 query.setParameter("id", actionBean.getId()); 135 break; 136 137 case UPDATE_COORD_ACTION_RERUN: 138 query.setParameter("actionXml", actionBean.getActionXmlBlob()); 139 query.setParameter("status", actionBean.getStatusStr()); 140 query.setParameter("externalId", actionBean.getExternalId()); 141 query.setParameter("externalStatus", actionBean.getExternalStatus()); 142 query.setParameter("rerunTime", actionBean.getRerunTimestamp()); 143 query.setParameter("lastModifiedTime", actionBean.getLastModifiedTimestamp()); 144 query.setParameter("createdTime", actionBean.getCreatedTimestamp()); 145 query.setParameter("createdConf", actionBean.getCreatedConfBlob()); 146 query.setParameter("runConf", actionBean.getRunConfBlob()); 147 query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob()); 148 query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob()); 149 query.setParameter("errorCode", actionBean.getErrorCode()); 150 query.setParameter("errorMessage", actionBean.getErrorMessage()); 151 query.setParameter("id", actionBean.getId()); 152 break; 153 154 default: 155 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 156 + namedQuery.name()); 157 } 158 return query; 159 } 160 161 @Override 162 public Query getSelectQuery(CoordActionQuery namedQuery, EntityManager em, Object... parameters) 163 throws JPAExecutorException { 164 Query query = em.createNamedQuery(namedQuery.name()); 165 CoordActionQuery caQuery = (CoordActionQuery) namedQuery; 166 switch (caQuery) { 167 case GET_COORD_ACTION: 168 case GET_COORD_ACTION_STATUS: 169 query.setParameter("id", parameters[0]); 170 break; 171 case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME: 172 query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime())); 173 break; 174 default: 175 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 176 + caQuery.name()); 177 } 178 return query; 179 } 180 181 @Override 182 public int executeUpdate(CoordActionQuery namedQuery, CoordinatorActionBean jobBean) throws JPAExecutorException { 183 JPAService jpaService = Services.get().get(JPAService.class); 184 EntityManager em = jpaService.getEntityManager(); 185 Query query = getUpdateQuery(namedQuery, jobBean, em); 186 int ret = jpaService.executeUpdate(namedQuery.name(), query, em); 187 return ret; 188 } 189 190 @Override 191 public CoordinatorActionBean get(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException { 192 JPAService jpaService = Services.get().get(JPAService.class); 193 EntityManager em = jpaService.getEntityManager(); 194 Query query = getSelectQuery(namedQuery, em, parameters); 195 Object ret = jpaService.executeGet(namedQuery.name(), query, em); 196 if (ret == null) { 197 throw new JPAExecutorException(ErrorCode.E0605, query.toString()); 198 } 199 CoordinatorActionBean bean = constructBean(namedQuery, ret); 200 return bean; 201 } 202 203 @Override 204 public List<CoordinatorActionBean> getList(CoordActionQuery namedQuery, Object... parameters) 205 throws JPAExecutorException { 206 JPAService jpaService = Services.get().get(JPAService.class); 207 EntityManager em = jpaService.getEntityManager(); 208 Query query = getSelectQuery(namedQuery, em, parameters); 209 List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em); 210 List<CoordinatorActionBean> beanList = new ArrayList<CoordinatorActionBean>(); 211 if (retList != null) { 212 for (Object ret : retList) { 213 beanList.add(constructBean(namedQuery, ret)); 214 } 215 } 216 return beanList; 217 } 218 219 private CoordinatorActionBean constructBean(CoordActionQuery namedQuery, Object ret) throws JPAExecutorException { 220 CoordinatorActionBean bean; 221 Object[] arr; 222 switch (namedQuery) { 223 case GET_COORD_ACTION: 224 bean = (CoordinatorActionBean) ret; 225 break; 226 case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME: 227 bean = new CoordinatorActionBean(); 228 bean.setJobId((String) ret); 229 break; 230 case GET_COORD_ACTION_STATUS: 231 bean = new CoordinatorActionBean(); 232 bean.setStatusStr((String)ret); 233 break; 234 default: 235 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for " 236 + namedQuery.name()); 237 } 238 return bean; 239 } 240 241 @Override 242 public Object getSingleValue(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException { 243 throw new UnsupportedOperationException(); 244 } 245}