001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.executor.jpa;
019
020import java.sql.Timestamp;
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.List;
024
025import javax.persistence.EntityManager;
026import javax.persistence.Query;
027
028import org.apache.oozie.CoordinatorActionBean;
029import org.apache.oozie.ErrorCode;
030import org.apache.oozie.service.JPAService;
031import org.apache.oozie.service.Services;
032
033/**
034 * Query Executor that provides API to run query for Coordinator Action
035 */
036public class CoordActionQueryExecutor extends
037        QueryExecutor<CoordinatorActionBean, CoordActionQueryExecutor.CoordActionQuery> {
038
039    public enum CoordActionQuery {
040        UPDATE_COORD_ACTION,
041        UPDATE_COORD_ACTION_STATUS_PENDING_TIME,
042        UPDATE_COORD_ACTION_FOR_INPUTCHECK,
043        UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK,
044        UPDATE_COORD_ACTION_DEPENDENCIES,
045        UPDATE_COORD_ACTION_FOR_START,
046        UPDATE_COORD_ACTION_FOR_MODIFIED_DATE,
047        UPDATE_COORD_ACTION_RERUN,
048        GET_COORD_ACTION,
049        GET_COORD_ACTION_STATUS,
050        GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID,
051        GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME
052    };
053
054    private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor();
055
056    private CoordActionQueryExecutor() {
057    }
058
059    public static QueryExecutor<CoordinatorActionBean, CoordActionQueryExecutor.CoordActionQuery> getInstance() {
060        return CoordActionQueryExecutor.instance;
061    }
062
063    @Override
064    public Query getUpdateQuery(CoordActionQuery namedQuery, CoordinatorActionBean actionBean, EntityManager em)
065            throws JPAExecutorException {
066
067        Query query = em.createNamedQuery(namedQuery.name());
068        switch (namedQuery) {
069            case UPDATE_COORD_ACTION:
070                query.setParameter("actionNumber", actionBean.getActionNumber());
071                query.setParameter("actionXml", actionBean.getActionXmlBlob());
072                query.setParameter("consoleUrl", actionBean.getConsoleUrl());
073                query.setParameter("createdConf", actionBean.getCreatedConfBlob());
074                query.setParameter("errorCode", actionBean.getErrorCode());
075                query.setParameter("errorMessage", actionBean.getErrorMessage());
076                query.setParameter("externalStatus", actionBean.getExternalStatus());
077                query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob());
078                query.setParameter("runConf", actionBean.getRunConfBlob());
079                query.setParameter("timeOut", actionBean.getTimeOut());
080                query.setParameter("trackerUri", actionBean.getTrackerUri());
081                query.setParameter("type", actionBean.getType());
082                query.setParameter("createdTime", actionBean.getCreatedTimestamp());
083                query.setParameter("externalId", actionBean.getExternalId());
084                query.setParameter("jobId", actionBean.getJobId());
085                query.setParameter("lastModifiedTime", new Date());
086                query.setParameter("nominalTime", actionBean.getNominalTimestamp());
087                query.setParameter("slaXml", actionBean.getSlaXmlBlob());
088                query.setParameter("status", actionBean.getStatus().toString());
089                query.setParameter("id", actionBean.getId());
090                break;
091
092            case UPDATE_COORD_ACTION_STATUS_PENDING_TIME:
093                query.setParameter("status", actionBean.getStatus().toString());
094                query.setParameter("pending", actionBean.getPending());
095                query.setParameter("lastModifiedTime", new Date());
096                query.setParameter("id", actionBean.getId());
097                break;
098
099            case UPDATE_COORD_ACTION_FOR_INPUTCHECK:
100                query.setParameter("status", actionBean.getStatus().toString());
101                query.setParameter("lastModifiedTime", new Date());
102                query.setParameter("actionXml", actionBean.getActionXmlBlob());
103                query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob());
104                query.setParameter("id", actionBean.getId());
105                break;
106
107            case UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK:
108                query.setParameter("status", actionBean.getStatus().toString());
109                query.setParameter("lastModifiedTime", new Date());
110                query.setParameter("actionXml", actionBean.getActionXmlBlob());
111                query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob());
112                query.setParameter("id", actionBean.getId());
113                break;
114
115            case UPDATE_COORD_ACTION_DEPENDENCIES:
116                query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob());
117                query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob());
118                query.setParameter("id", actionBean.getId());
119                break;
120
121            case UPDATE_COORD_ACTION_FOR_START:
122                query.setParameter("status", actionBean.getStatus().toString());
123                query.setParameter("lastModifiedTime", new Date());
124                query.setParameter("runConf", actionBean.getRunConfBlob());
125                query.setParameter("externalId", actionBean.getExternalId());
126                query.setParameter("pending", actionBean.getPending());
127                query.setParameter("errorCode", actionBean.getErrorCode());
128                query.setParameter("errorMessage", actionBean.getErrorMessage());
129                query.setParameter("id", actionBean.getId());
130                break;
131
132            case UPDATE_COORD_ACTION_FOR_MODIFIED_DATE:
133                query.setParameter("lastModifiedTime", actionBean.getLastModifiedTimestamp());
134                query.setParameter("id", actionBean.getId());
135                break;
136
137            case UPDATE_COORD_ACTION_RERUN:
138                query.setParameter("actionXml", actionBean.getActionXmlBlob());
139                query.setParameter("status", actionBean.getStatusStr());
140                query.setParameter("externalId", actionBean.getExternalId());
141                query.setParameter("externalStatus", actionBean.getExternalStatus());
142                query.setParameter("rerunTime", actionBean.getRerunTimestamp());
143                query.setParameter("lastModifiedTime", actionBean.getLastModifiedTimestamp());
144                query.setParameter("createdTime", actionBean.getCreatedTimestamp());
145                query.setParameter("createdConf", actionBean.getCreatedConfBlob());
146                query.setParameter("runConf", actionBean.getRunConfBlob());
147                query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob());
148                query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob());
149                query.setParameter("errorCode", actionBean.getErrorCode());
150                query.setParameter("errorMessage", actionBean.getErrorMessage());
151                query.setParameter("id", actionBean.getId());
152                break;
153
154            default:
155                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
156                        + namedQuery.name());
157        }
158        return query;
159    }
160
161    @Override
162    public Query getSelectQuery(CoordActionQuery namedQuery, EntityManager em, Object... parameters)
163            throws JPAExecutorException {
164        Query query = em.createNamedQuery(namedQuery.name());
165        CoordActionQuery caQuery = (CoordActionQuery) namedQuery;
166        switch (caQuery) {
167            case GET_COORD_ACTION:
168            case GET_COORD_ACTION_STATUS:
169                query.setParameter("id", parameters[0]);
170                break;
171            case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME:
172                query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime()));
173                break;
174            default:
175                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
176                        + caQuery.name());
177        }
178        return query;
179    }
180
181    @Override
182    public int executeUpdate(CoordActionQuery namedQuery, CoordinatorActionBean jobBean) throws JPAExecutorException {
183        JPAService jpaService = Services.get().get(JPAService.class);
184        EntityManager em = jpaService.getEntityManager();
185        Query query = getUpdateQuery(namedQuery, jobBean, em);
186        int ret = jpaService.executeUpdate(namedQuery.name(), query, em);
187        return ret;
188    }
189
190    @Override
191    public CoordinatorActionBean get(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
192        JPAService jpaService = Services.get().get(JPAService.class);
193        EntityManager em = jpaService.getEntityManager();
194        Query query = getSelectQuery(namedQuery, em, parameters);
195        Object ret = jpaService.executeGet(namedQuery.name(), query, em);
196        if (ret == null) {
197            throw new JPAExecutorException(ErrorCode.E0605, query.toString());
198        }
199        CoordinatorActionBean bean = constructBean(namedQuery, ret);
200        return bean;
201    }
202
203    @Override
204    public List<CoordinatorActionBean> getList(CoordActionQuery namedQuery, Object... parameters)
205            throws JPAExecutorException {
206        JPAService jpaService = Services.get().get(JPAService.class);
207        EntityManager em = jpaService.getEntityManager();
208        Query query = getSelectQuery(namedQuery, em, parameters);
209        List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em);
210        List<CoordinatorActionBean> beanList = new ArrayList<CoordinatorActionBean>();
211        if (retList != null) {
212            for (Object ret : retList) {
213                beanList.add(constructBean(namedQuery, ret));
214            }
215        }
216        return beanList;
217    }
218
219    private CoordinatorActionBean constructBean(CoordActionQuery namedQuery, Object ret) throws JPAExecutorException {
220        CoordinatorActionBean bean;
221        Object[] arr;
222        switch (namedQuery) {
223            case GET_COORD_ACTION:
224                bean = (CoordinatorActionBean) ret;
225                break;
226            case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME:
227                bean = new CoordinatorActionBean();
228                bean.setJobId((String) ret);
229                break;
230            case GET_COORD_ACTION_STATUS:
231                bean = new CoordinatorActionBean();
232                bean.setStatusStr((String)ret);
233                break;
234            default:
235                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "
236                        + namedQuery.name());
237        }
238        return bean;
239    }
240
241    @Override
242    public Object getSingleValue(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
243        throw new UnsupportedOperationException();
244    }
245}