001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.executor.jpa;
019
020import java.sql.Timestamp;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Map;
024
025import javax.persistence.EntityManager;
026import javax.persistence.Query;
027
028import org.apache.oozie.BundleJobBean;
029import org.apache.oozie.BundleJobInfo;
030import org.apache.oozie.StringBlob;
031import org.apache.oozie.client.Job;
032import org.apache.oozie.client.BundleJob.Timeunit;
033import org.apache.oozie.store.StoreStatusFilter;
034import org.apache.oozie.util.ParamChecker;
035import org.apache.openjpa.persistence.OpenJPAPersistence;
036import org.apache.openjpa.persistence.OpenJPAQuery;
037import org.apache.openjpa.persistence.jdbc.FetchDirection;
038import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
039import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
040import org.apache.openjpa.persistence.jdbc.ResultSetType;
041
042/**
043 * Load the BundleInfo and return it.
044 */
045public class BundleJobInfoGetJPAExecutor implements JPAExecutor<BundleJobInfo> {
046
047    private Map<String, List<String>> filter;
048    private int start = 1;
049    private int len = 50;
050
051    /**
052     * The constructor for BundleJobInfoGetJPAExecutor
053     *
054     * @param filter the filter string
055     * @param start start location for paging
056     * @param len total length to get
057     */
058    public BundleJobInfoGetJPAExecutor(Map<String, List<String>> filter, int start, int len) {
059        ParamChecker.notNull(filter, "filter");
060        this.filter = filter;
061        this.start = start;
062        this.len = len;
063    }
064
065    /* (non-Javadoc)
066     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
067     */
068    @Override
069    public String getName() {
070        return "BundleJobInfoGetJPAExecutor";
071    }
072
073    /* (non-Javadoc)
074     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
075     */
076    @Override
077    @SuppressWarnings("unchecked")
078    public BundleJobInfo execute(EntityManager em) throws JPAExecutorException {
079        List<String> orArray = new ArrayList<String>();
080        List<String> colArray = new ArrayList<String>();
081        List<String> valArray = new ArrayList<String>();
082        StringBuilder sb = new StringBuilder("");
083
084        StoreStatusFilter.filter(filter, orArray, colArray, valArray, sb, StoreStatusFilter.bundleSeletStr,
085                                 StoreStatusFilter.bundleCountStr);
086
087        int realLen = 0;
088
089        Query q = null;
090        Query qTotal = null;
091        if (orArray.size() == 0) {
092            q = em.createNamedQuery("GET_BUNDLE_JOBS_COLUMNS");
093            q.setFirstResult(start - 1);
094            q.setMaxResults(len);
095            qTotal = em.createNamedQuery("GET_BUNDLE_JOBS_COUNT");
096        }
097        else {
098            StringBuilder sbTotal = new StringBuilder(sb);
099            sb.append(" order by w.createdTimestamp desc ");
100            q = em.createQuery(sb.toString());
101            q.setFirstResult(start - 1);
102            q.setMaxResults(len);
103            qTotal = em.createQuery(sbTotal.toString().replace(StoreStatusFilter.bundleSeletStr,
104                                                                          StoreStatusFilter.bundleCountStr));
105        }
106
107        for (int i = 0; i < orArray.size(); i++) {
108            q.setParameter(colArray.get(i), valArray.get(i));
109            qTotal.setParameter(colArray.get(i), valArray.get(i));
110        }
111
112        OpenJPAQuery kq = OpenJPAPersistence.cast(q);
113        JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
114        fetch.setFetchBatchSize(20);
115        fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
116        fetch.setFetchDirection(FetchDirection.FORWARD);
117        fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
118        List<?> resultList = q.getResultList();
119        List<Object[]> objectArrList = (List<Object[]>) resultList;
120        List<BundleJobBean> bundleBeansList = new ArrayList<BundleJobBean>();
121
122        for (Object[] arr : objectArrList) {
123            BundleJobBean bean = getBeanForBundleJobFromArray(arr);
124            bundleBeansList.add(bean);
125        }
126
127        realLen = ((Long) qTotal.getSingleResult()).intValue();
128
129        return new BundleJobInfo(bundleBeansList, start, len, realLen);
130    }
131
132    private BundleJobBean getBeanForBundleJobFromArray(Object[] arr) {
133
134        BundleJobBean bean = new BundleJobBean();
135        bean.setId((String) arr[0]);
136        if (arr[1] != null) {
137            bean.setAppName((String) arr[1]);
138        }
139        if (arr[2] != null) {
140            bean.setAppPath((String) arr[2]);
141        }
142        if (arr[3] != null) {
143            bean.setConfBlob((StringBlob) arr[3]);
144        }
145        if (arr[4] != null) {
146            bean.setStatus(Job.Status.valueOf((String) arr[4]));
147        }
148        if (arr[5] != null) {
149            bean.setKickoffTime((Timestamp) arr[5]);
150        }
151        if (arr[6] != null) {
152            bean.setStartTime((Timestamp) arr[6]);
153        }
154        if (arr[7] != null) {
155            bean.setEndTime((Timestamp) arr[7]);
156        }
157        if (arr[8] != null) {
158            bean.setPauseTime((Timestamp) arr[8]);
159        }
160        if (arr[9] != null) {
161            bean.setCreatedTime((Timestamp) arr[9]);
162        }
163        if (arr[10] != null) {
164            bean.setUser((String) arr[10]);
165        }
166        if (arr[11] != null) {
167            bean.setGroup((String) arr[11]);
168        }
169        if (arr[12] != null) {
170            bean.setTimeUnit(Timeunit.valueOf((String) arr[12]));
171        }
172        if (arr[13] != null) {
173            bean.setTimeOut((Integer) arr[13]);
174        }
175
176        return bean;
177    }
178}