001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.executor.jpa;
020
021import java.sql.Timestamp;
022import java.util.ArrayList;
023import java.util.List;
024
025import javax.persistence.EntityManager;
026import javax.persistence.Query;
027
028import org.apache.oozie.BundleActionBean;
029import org.apache.oozie.ErrorCode;
030import org.apache.oozie.service.JPAService;
031import org.apache.oozie.service.Services;
032
033/**
034 * Query Executor that provides API to run query for Bundle Action
035 */
036public class BundleActionQueryExecutor extends
037        QueryExecutor<BundleActionBean, BundleActionQueryExecutor.BundleActionQuery> {
038
039    public enum BundleActionQuery {
040        UPDATE_BUNDLE_ACTION_PENDING_MODTIME,
041        UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME,
042        UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID,
043        GET_BUNDLE_ACTION,
044        GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE,
045        GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN,
046        GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE
047    };
048
049    private static BundleActionQueryExecutor instance = new BundleActionQueryExecutor();
050
051    private BundleActionQueryExecutor() {
052    }
053
054    public static QueryExecutor<BundleActionBean, BundleActionQueryExecutor.BundleActionQuery> getInstance() {
055        return BundleActionQueryExecutor.instance;
056    }
057
058    @Override
059    public Query getUpdateQuery(BundleActionQuery namedQuery, BundleActionBean baBean, EntityManager em)
060            throws JPAExecutorException {
061
062        Query query = em.createNamedQuery(namedQuery.name());
063        switch (namedQuery) {
064            case UPDATE_BUNDLE_ACTION_PENDING_MODTIME:
065                query.setParameter("lastModifiedTime", baBean.getLastModifiedTimestamp());
066                query.setParameter("pending", baBean.getPending());
067                query.setParameter("bundleActionId", baBean.getBundleActionId());
068                break;
069            case UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME:
070                query.setParameter("status", baBean.getStatusStr());
071                query.setParameter("lastModifiedTime", baBean.getLastModifiedTimestamp());
072                query.setParameter("pending", baBean.getPending());
073                query.setParameter("bundleActionId", baBean.getBundleActionId());
074                break;
075            case UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID:
076                query.setParameter("status", baBean.getStatusStr());
077                query.setParameter("lastModifiedTime", baBean.getLastModifiedTimestamp());
078                query.setParameter("pending", baBean.getPending());
079                query.setParameter("coordId", baBean.getCoordId());
080                query.setParameter("bundleActionId", baBean.getBundleActionId());
081                break;
082            default:
083                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
084                        + namedQuery.name());
085        }
086        return query;
087    }
088
089    @Override
090    public Query getSelectQuery(BundleActionQuery namedQuery, EntityManager em, Object... parameters)
091            throws JPAExecutorException {
092        Query query = em.createNamedQuery(namedQuery.name());
093        switch (namedQuery) {
094            case GET_BUNDLE_ACTION:
095                query.setParameter("bundleActionId", parameters[0]);
096                break;
097            case GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE:
098                query.setParameter("bundleId", parameters[0]);
099                break;
100            case GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN:
101                Timestamp ts = new Timestamp(System.currentTimeMillis() - (Long)parameters[0] * 1000);
102                query.setParameter("lastModifiedTime", ts);
103                break;
104            case GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE:
105                query.setParameter("bundleId", parameters[0]);
106                break;
107            default:
108                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
109                        + namedQuery.name());
110        }
111        return query;
112    }
113
114    @Override
115    public int executeUpdate(BundleActionQuery namedQuery, BundleActionBean jobBean) throws JPAExecutorException {
116        JPAService jpaService = Services.get().get(JPAService.class);
117        EntityManager em = jpaService.getEntityManager();
118        Query query = getUpdateQuery(namedQuery, jobBean, em);
119        int ret = jpaService.executeUpdate(namedQuery.name(), query, em);
120        return ret;
121    }
122
123    @Override
124    public BundleActionBean get(BundleActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
125        BundleActionBean bean = getIfExist(namedQuery, parameters);
126        if (bean == null) {
127            throw new JPAExecutorException(ErrorCode.E0604, getSelectQuery(namedQuery,
128                    Services.get().get(JPAService.class).getEntityManager(), parameters));
129        }
130        return bean;
131    }
132
133    private BundleActionBean constructBean(BundleActionQuery namedQuery, Object ret) throws JPAExecutorException {
134        BundleActionBean bean;
135        Object[] arr;
136        switch (namedQuery) {
137            case GET_BUNDLE_ACTION:
138            case GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE:
139                bean = (BundleActionBean) ret;
140                break;
141            case GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN:
142                bean = new BundleActionBean();
143                arr = (Object[]) ret;
144                bean.setBundleActionId((String) arr[0]);
145                bean.setBundleId((String) arr[1]);
146                bean.setStatusStr((String) arr[2]);
147                bean.setCoordId((String) arr[3]);
148                bean.setCoordName((String) arr[4]);
149                break;
150            case GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE:
151                bean = new BundleActionBean();
152                arr = (Object[]) ret;
153                bean.setCoordId((String) arr[0]);
154                bean.setStatusStr((String) arr[1]);
155                bean.setPending((Integer) arr[2]);
156                break;
157            default:
158                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "
159                        + namedQuery.name());
160        }
161        return bean;
162    }
163
164    @Override
165    public List<BundleActionBean> getList(BundleActionQuery namedQuery, Object... parameters)
166            throws JPAExecutorException {
167        JPAService jpaService = Services.get().get(JPAService.class);
168        EntityManager em = jpaService.getEntityManager();
169        Query query = getSelectQuery(namedQuery, em, parameters);
170        List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em);
171        List<BundleActionBean> beanList = new ArrayList<BundleActionBean>();
172        if (retList != null) {
173            for (Object ret : retList) {
174                beanList.add(constructBean(namedQuery, ret));
175            }
176        }
177        return beanList;
178    }
179
180    @Override
181    public Object getSingleValue(BundleActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
182        throw new UnsupportedOperationException();
183    }
184
185    @Override
186    public BundleActionBean getIfExist(BundleActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
187        JPAService jpaService = Services.get().get(JPAService.class);
188        EntityManager em = jpaService.getEntityManager();
189        Query query = getSelectQuery(namedQuery, em, parameters);
190        Object ret = jpaService.executeGet(namedQuery.name(), query, em);
191        if (ret == null) {
192            return null;
193        }
194        BundleActionBean bean = constructBean(namedQuery, ret);
195        return bean;
196    }
197
198}