001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.executor.jpa;
020
021import java.sql.Timestamp;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.List;
025
026import javax.persistence.EntityManager;
027import javax.persistence.Query;
028
029import org.apache.oozie.BundleJobBean;
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.StringBlob;
032import org.apache.oozie.service.JPAService;
033import org.apache.oozie.service.Services;
034import org.apache.oozie.util.DateUtils;
035
036/**
037 * Query Executor that provides API to run query for Bundle Job
038 */
039public class BundleJobQueryExecutor extends QueryExecutor<BundleJobBean, BundleJobQueryExecutor.BundleJobQuery> {
040
041    public enum BundleJobQuery {
042        UPDATE_BUNDLE_JOB,
043        UPDATE_BUNDLE_JOB_STATUS,
044        UPDATE_BUNDLE_JOB_STATUS_PENDING,
045        UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME,
046        UPDATE_BUNDLE_JOB_STATUS_PENDING_SUSP_MOD_TIME,
047        UPDATE_BUNDLE_JOB_STATUS_PAUSE_ENDTIME,
048        UPDATE_BUNDLE_JOB_PAUSE_KICKOFF,
049        GET_BUNDLE_JOB,
050        GET_BUNDLE_JOB_STATUS,
051        GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME,
052        GET_BUNDLE_JOB_ID_JOBXML_CONF,
053        GET_BUNDLE_IDS_FOR_STATUS_TRANSIT
054    };
055
056    private static BundleJobQueryExecutor instance = new BundleJobQueryExecutor();
057
058    private BundleJobQueryExecutor() {
059    }
060
061    public static QueryExecutor<BundleJobBean, BundleJobQueryExecutor.BundleJobQuery> getInstance() {
062        return BundleJobQueryExecutor.instance;
063    }
064
065    @Override
066    public Query getUpdateQuery(BundleJobQuery namedQuery, BundleJobBean bjBean, EntityManager em)
067            throws JPAExecutorException {
068        Query query = em.createNamedQuery(namedQuery.name());
069        switch (namedQuery) {
070            case UPDATE_BUNDLE_JOB:
071                query.setParameter("appName", bjBean.getAppName());
072                query.setParameter("appPath", bjBean.getAppPath());
073                query.setParameter("conf", bjBean.getConfBlob());
074                query.setParameter("timeOut", bjBean.getTimeout());
075                query.setParameter("createdTime", bjBean.getCreatedTimestamp());
076                query.setParameter("endTime", bjBean.getEndTimestamp());
077                query.setParameter("jobXml", bjBean.getJobXmlBlob());
078                query.setParameter("lastModifiedTime", bjBean.getLastModifiedTimestamp());
079                query.setParameter("origJobXml", bjBean.getOrigJobXmlBlob());
080                query.setParameter("startTime", bjBean.getstartTimestamp());
081                query.setParameter("status", bjBean.getStatus().toString());
082                query.setParameter("timeUnit", bjBean.getTimeUnitStr());
083                query.setParameter("pending", bjBean.isPending() ? 1 : 0);
084                query.setParameter("id", bjBean.getId());
085                break;
086            case UPDATE_BUNDLE_JOB_STATUS:
087                query.setParameter("status", bjBean.getStatus().toString());
088                query.setParameter("lastModifiedTime", bjBean.getLastModifiedTimestamp());
089                query.setParameter("pending", bjBean.getPending());
090                query.setParameter("id", bjBean.getId());
091                break;
092            case UPDATE_BUNDLE_JOB_STATUS_PENDING:
093                query.setParameter("status", bjBean.getStatus().toString());
094                query.setParameter("pending", bjBean.getPending());
095                query.setParameter("id", bjBean.getId());
096                break;
097            case UPDATE_BUNDLE_JOB_STATUS_PENDING_SUSP_MOD_TIME:
098                query.setParameter("status", bjBean.getStatus().toString());
099                query.setParameter("lastModifiedTime", bjBean.getLastModifiedTimestamp());
100                query.setParameter("pending", bjBean.getPending());
101                query.setParameter("suspendedTime", bjBean.getSuspendedTimestamp());
102                query.setParameter("id", bjBean.getId());
103                break;
104            case UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME:
105                query.setParameter("status", bjBean.getStatus().toString());
106                query.setParameter("lastModifiedTime", bjBean.getLastModifiedTimestamp());
107                query.setParameter("pending", bjBean.getPending());
108                query.setParameter("id", bjBean.getId());
109                break;
110            case UPDATE_BUNDLE_JOB_STATUS_PAUSE_ENDTIME:
111                query.setParameter("status", bjBean.getStatus().toString());
112                query.setParameter("pauseTime", bjBean.getPauseTimestamp());
113                query.setParameter("endTime", bjBean.getEndTimestamp());
114                query.setParameter("id", bjBean.getId());
115                break;
116            case UPDATE_BUNDLE_JOB_PAUSE_KICKOFF:
117                query.setParameter("pauseTime", bjBean.getPauseTimestamp());
118                query.setParameter("kickoffTime", bjBean.getKickoffTimestamp());
119                query.setParameter("id", bjBean.getId());
120                break;
121            default:
122                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
123                        + namedQuery.name());
124        }
125        return query;
126    }
127
128    @Override
129    public Query getSelectQuery(BundleJobQuery namedQuery, EntityManager em, Object... parameters)
130            throws JPAExecutorException {
131        Query query = em.createNamedQuery(namedQuery.name());
132        switch (namedQuery) {
133            case GET_BUNDLE_JOB:
134            case GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME:
135            case GET_BUNDLE_JOB_ID_JOBXML_CONF:
136            case GET_BUNDLE_JOB_STATUS:
137                query.setParameter("id", parameters[0]);
138                break;
139            case GET_BUNDLE_IDS_FOR_STATUS_TRANSIT:
140                query.setParameter("lastModifiedTime", DateUtils.convertDateToTimestamp((Date)parameters[0]));
141                break;
142            default:
143                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
144                        + namedQuery.name());
145        }
146        return query;
147    }
148
149    @Override
150    public int executeUpdate(BundleJobQuery namedQuery, BundleJobBean jobBean) throws JPAExecutorException {
151        JPAService jpaService = Services.get().get(JPAService.class);
152        EntityManager em = jpaService.getEntityManager();
153        Query query = getUpdateQuery(namedQuery, jobBean, em);
154        int ret = jpaService.executeUpdate(namedQuery.name(), query, em);
155        return ret;
156    }
157
158    @Override
159    public BundleJobBean get(BundleJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
160        BundleJobBean bean = getIfExist(namedQuery, parameters);
161        if (bean == null) {
162            throw new JPAExecutorException(ErrorCode.E0605, getSelectQuery(namedQuery,
163                    Services.get().get(JPAService.class).getEntityManager(), parameters).toString());
164        }
165        return bean;
166    }
167
168    @Override
169    public BundleJobBean getIfExist(BundleJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
170        JPAService jpaService = Services.get().get(JPAService.class);
171        EntityManager em = jpaService.getEntityManager();
172        Query query = getSelectQuery(namedQuery, em, parameters);
173        Object ret = jpaService.executeGet(namedQuery.name(), query, em);
174        if (ret == null) {
175            return null;
176        }
177        BundleJobBean bean = constructBean(namedQuery, ret, parameters);
178        return bean;
179    }
180
181    @Override
182    public List<BundleJobBean> getList(BundleJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
183        JPAService jpaService = Services.get().get(JPAService.class);
184        EntityManager em = jpaService.getEntityManager();
185        Query query = getSelectQuery(namedQuery, em, parameters);
186        List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em);
187        List<BundleJobBean> beanList = new ArrayList<BundleJobBean>();
188        if (retList != null) {
189            for (Object ret : retList) {
190                beanList.add(constructBean(namedQuery, ret));
191            }
192        }
193        return beanList;
194    }
195
196    private BundleJobBean constructBean(BundleJobQuery namedQuery, Object ret, Object... parameters)
197            throws JPAExecutorException {
198        BundleJobBean bean;
199        Object[] arr;
200        switch (namedQuery) {
201            case GET_BUNDLE_JOB:
202                bean = (BundleJobBean) ret;
203                break;
204            case GET_BUNDLE_JOB_STATUS:
205                bean = new BundleJobBean();
206                bean.setId((String) parameters[0]);
207                bean.setStatus((String) ret);
208                break;
209            case GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME:
210                bean = new BundleJobBean();
211                arr = (Object[]) ret;
212                bean.setId((String) arr[0]);
213                bean.setStatus((String) arr[1]);
214                bean.setPending((Integer) arr[2]);
215                bean.setLastModifiedTime(DateUtils.toDate((Timestamp) arr[3]));
216                bean.setPauseTime(DateUtils.toDate((Timestamp) arr[4]));
217                bean.setSuspendedTime(DateUtils.toDate((Timestamp) arr[5]));
218
219                break;
220            case GET_BUNDLE_JOB_ID_JOBXML_CONF:
221                bean = new BundleJobBean();
222                arr = (Object[]) ret;
223                bean.setId((String) arr[0]);
224                bean.setJobXmlBlob((StringBlob) arr[1]);
225                bean.setConfBlob((StringBlob) arr[2]);
226                break;
227
228            case GET_BUNDLE_IDS_FOR_STATUS_TRANSIT:
229                bean = new BundleJobBean();
230                bean.setId((String) ret);
231                break;
232
233            default:
234                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
235                        + namedQuery.name());
236        }
237        return bean;
238    }
239
240    @Override
241    public Object getSingleValue(BundleJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
242        throw new UnsupportedOperationException();
243    }
244}