001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.executor.jpa;
019
020import java.sql.Timestamp;
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.List;
024
025import javax.persistence.EntityManager;
026import javax.persistence.Query;
027
028import org.apache.oozie.BundleActionBean;
029import org.apache.oozie.ErrorCode;
030import org.apache.oozie.service.JPAService;
031import org.apache.oozie.service.Services;
032
033/**
034 * Query Executor that provides API to run query for Bundle Action
035 */
036public class BundleActionQueryExecutor extends
037        QueryExecutor<BundleActionBean, BundleActionQueryExecutor.BundleActionQuery> {
038
039    public enum BundleActionQuery {
040        UPDATE_BUNDLE_ACTION_PENDING_MODTIME,
041        UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME,
042        UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID,
043        GET_BUNDLE_ACTION,
044        GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE,
045        GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME,
046        GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN,
047        GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE
048    };
049
050    private static BundleActionQueryExecutor instance = new BundleActionQueryExecutor();
051
052    private BundleActionQueryExecutor() {
053    }
054
055    public static QueryExecutor<BundleActionBean, BundleActionQueryExecutor.BundleActionQuery> getInstance() {
056        return BundleActionQueryExecutor.instance;
057    }
058
059    @Override
060    public Query getUpdateQuery(BundleActionQuery namedQuery, BundleActionBean baBean, EntityManager em)
061            throws JPAExecutorException {
062
063        Query query = em.createNamedQuery(namedQuery.name());
064        switch (namedQuery) {
065            case UPDATE_BUNDLE_ACTION_PENDING_MODTIME:
066                query.setParameter("lastModifiedTime", baBean.getLastModifiedTimestamp());
067                query.setParameter("pending", baBean.getPending());
068                query.setParameter("bundleActionId", baBean.getBundleActionId());
069                break;
070            case UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME:
071                query.setParameter("status", baBean.getStatusStr());
072                query.setParameter("lastModifiedTime", baBean.getLastModifiedTimestamp());
073                query.setParameter("pending", baBean.getPending());
074                query.setParameter("bundleActionId", baBean.getBundleActionId());
075                break;
076            case UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID:
077                query.setParameter("status", baBean.getStatusStr());
078                query.setParameter("lastModifiedTime", baBean.getLastModifiedTimestamp());
079                query.setParameter("pending", baBean.getPending());
080                query.setParameter("coordId", baBean.getCoordId());
081                query.setParameter("bundleActionId", baBean.getBundleActionId());
082                break;
083            default:
084                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
085                        + namedQuery.name());
086        }
087        return query;
088    }
089
090    @Override
091    public Query getSelectQuery(BundleActionQuery namedQuery, EntityManager em, Object... parameters)
092            throws JPAExecutorException {
093        Query query = em.createNamedQuery(namedQuery.name());
094        switch (namedQuery) {
095            case GET_BUNDLE_ACTION:
096                query.setParameter("bundleActionId", parameters[0]);
097                break;
098            case GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE:
099                query.setParameter("bundleId", parameters[0]);
100                break;
101            case GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME:
102                query.setParameter("lastModifiedTime", new Timestamp(((Date)parameters[0]).getTime()));
103                break;
104            case GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN:
105                Timestamp ts = new Timestamp(System.currentTimeMillis() - (Long)parameters[0] * 1000);
106                query.setParameter("lastModifiedTime", ts);
107                break;
108            case GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE:
109                query.setParameter("bundleId", parameters[0]);
110                break;
111            default:
112                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
113                        + namedQuery.name());
114        }
115        return query;
116    }
117
118    @Override
119    public int executeUpdate(BundleActionQuery namedQuery, BundleActionBean jobBean) throws JPAExecutorException {
120        JPAService jpaService = Services.get().get(JPAService.class);
121        EntityManager em = jpaService.getEntityManager();
122        Query query = getUpdateQuery(namedQuery, jobBean, em);
123        int ret = jpaService.executeUpdate(namedQuery.name(), query, em);
124        return ret;
125    }
126
127    @Override
128    public BundleActionBean get(BundleActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
129        JPAService jpaService = Services.get().get(JPAService.class);
130        EntityManager em = jpaService.getEntityManager();
131        Query query = getSelectQuery(namedQuery, em, parameters);
132        Object ret = jpaService.executeGet(namedQuery.name(), query, em);
133        if (ret == null) {
134            throw new JPAExecutorException(ErrorCode.E0604, query.toString());
135        }
136        BundleActionBean bean = constructBean(namedQuery, ret);
137        return bean;
138    }
139
140    private BundleActionBean constructBean(BundleActionQuery namedQuery, Object ret) throws JPAExecutorException {
141        BundleActionBean bean;
142        Object[] arr;
143        switch (namedQuery) {
144            case GET_BUNDLE_ACTION:
145            case GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE:
146                bean = (BundleActionBean) ret;
147                break;
148            case GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME:
149                bean = new BundleActionBean();
150                bean.setBundleId((String) ret);
151                break;
152            case GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN:
153                bean = new BundleActionBean();
154                arr = (Object[]) ret;
155                bean.setBundleActionId((String) arr[0]);
156                bean.setBundleId((String) arr[1]);
157                bean.setStatusStr((String) arr[2]);
158                bean.setCoordId((String) arr[3]);
159                bean.setCoordName((String) arr[4]);
160                break;
161            case GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE:
162                bean = new BundleActionBean();
163                arr = (Object[]) ret;
164                bean.setCoordId((String) arr[0]);
165                bean.setStatusStr((String) arr[1]);
166                bean.setPending((Integer) arr[2]);
167                break;
168            default:
169                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "
170                        + namedQuery.name());
171        }
172        return bean;
173    }
174
175    @Override
176    public List<BundleActionBean> getList(BundleActionQuery namedQuery, Object... parameters)
177            throws JPAExecutorException {
178        JPAService jpaService = Services.get().get(JPAService.class);
179        EntityManager em = jpaService.getEntityManager();
180        Query query = getSelectQuery(namedQuery, em, parameters);
181        List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em);
182        List<BundleActionBean> beanList = new ArrayList<BundleActionBean>();
183        if (retList != null) {
184            for (Object ret : retList) {
185                beanList.add(constructBean(namedQuery, ret));
186            }
187        }
188        return beanList;
189    }
190
191    @Override
192    public Object getSingleValue(BundleActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
193        throw new UnsupportedOperationException();
194    }
195
196}