001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.executor.jpa;
020
021import java.sql.Timestamp;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Map;
025
026import javax.persistence.EntityManager;
027import javax.persistence.Query;
028
029import org.apache.oozie.CoordinatorJobBean;
030import org.apache.oozie.CoordinatorJobInfo;
031import org.apache.oozie.client.Job.Status;
032import org.apache.oozie.client.CoordinatorJob.Timeunit;
033import org.apache.oozie.store.StoreStatusFilter;
034import org.apache.oozie.util.ParamChecker;
035import org.apache.openjpa.persistence.OpenJPAPersistence;
036import org.apache.openjpa.persistence.OpenJPAQuery;
037import org.apache.openjpa.persistence.jdbc.FetchDirection;
038import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
039import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
040import org.apache.openjpa.persistence.jdbc.ResultSetType;
041
042/**
043 * Load the CoordinatorInfo and return it.
044 */
045public class CoordJobInfoGetJPAExecutor implements JPAExecutor<CoordinatorJobInfo> {
046
047    public static final String DEFAULT_ORDER_BY = " order by w.createdTimestamp desc ";
048    private Map<String, List<String>> filter;
049    private int start = 1;
050    private int len = 50;
051
052    public CoordJobInfoGetJPAExecutor(Map<String, List<String>> filter, int start, int len) {
053        ParamChecker.notNull(filter, "filter");
054        this.filter = filter;
055        this.start = start;
056        this.len = len;
057    }
058
059    @Override
060    public String getName() {
061        return "CoordJobInfoGetJPAExecutor";
062    }
063
064    @Override
065    @SuppressWarnings("unchecked")
066    public CoordinatorJobInfo execute(EntityManager em) throws JPAExecutorException {
067        List<String> orArray = new ArrayList<String>();
068        List<String> colArray = new ArrayList<String>();
069        List<Object> valArray = new ArrayList<Object>();
070        StringBuilder sb = new StringBuilder("");
071        String orderBy = DEFAULT_ORDER_BY;
072
073        StoreStatusFilter.filter(filter, orArray, colArray, valArray, sb, StoreStatusFilter.coordSeletStr,
074                                 StoreStatusFilter.coordCountStr);
075
076        orderBy = StoreStatusFilter.getSortBy(filter, orderBy);
077        int realLen = 0;
078
079        Query q = null;
080        Query qTotal = null;
081        if (orArray.size() == 0 && orderBy.equals(DEFAULT_ORDER_BY)) {
082            q = em.createNamedQuery("GET_COORD_JOBS_COLUMNS");
083            q.setFirstResult(start - 1);
084            q.setMaxResults(len);
085            qTotal = em.createNamedQuery("GET_COORD_JOBS_COUNT");
086        }
087        else {
088            sb = sb.toString().trim().length() == 0 ? sb.append(StoreStatusFilter.coordSeletStr) : sb;
089            String sbTotal = sb.toString();
090            sb.append(orderBy);
091            q = em.createQuery(sb.toString());
092            q.setFirstResult(start - 1);
093            q.setMaxResults(len);
094            qTotal = em.createQuery(sbTotal.replace(StoreStatusFilter.coordSeletStr,
095                                                                          StoreStatusFilter.coordCountStr));
096        }
097
098        for (int i = 0; i < orArray.size(); i++) {
099            q.setParameter(colArray.get(i), valArray.get(i));
100            qTotal.setParameter(colArray.get(i), valArray.get(i));
101        }
102
103        OpenJPAQuery kq = OpenJPAPersistence.cast(q);
104        JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
105        fetch.setFetchBatchSize(20);
106        fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
107        fetch.setFetchDirection(FetchDirection.FORWARD);
108        fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
109        List<?> resultList = q.getResultList();
110        List<Object[]> objectArrList = (List<Object[]>) resultList;
111        List<CoordinatorJobBean> coordBeansList = new ArrayList<CoordinatorJobBean>();
112
113        for (Object[] arr : objectArrList) {
114            CoordinatorJobBean ww = getBeanForCoordinatorJobFromArray(arr);
115            coordBeansList.add(ww);
116        }
117
118        realLen = ((Long) qTotal.getSingleResult()).intValue();
119
120        return new CoordinatorJobInfo(coordBeansList, start, len, realLen);
121    }
122
123    private CoordinatorJobBean getBeanForCoordinatorJobFromArray(Object[] arr) {
124        CoordinatorJobBean bean = new CoordinatorJobBean();
125        bean.setId((String) arr[0]);
126        if (arr[1] != null) {
127            bean.setAppName((String) arr[1]);
128        }
129        if (arr[2] != null) {
130            bean.setStatus(Status.valueOf((String) arr[2]));
131        }
132        if (arr[3] != null) {
133            bean.setUser((String) arr[3]);
134        }
135        if (arr[4] != null) {
136            bean.setGroup((String) arr[4]);
137        }
138        if (arr[5] != null) {
139            bean.setStartTime((Timestamp) arr[5]);
140        }
141        if (arr[6] != null) {
142            bean.setEndTime((Timestamp) arr[6]);
143        }
144        if (arr[7] != null) {
145            bean.setAppPath((String) arr[7]);
146        }
147        if (arr[8] != null) {
148            bean.setConcurrency(((Integer) arr[8]).intValue());
149        }
150        if (arr[9] != null) {
151            bean.setFrequency((String) arr[9]);
152        }
153        if (arr[10] != null) {
154            bean.setLastActionTime((Timestamp) arr[10]);
155        }
156        if (arr[11] != null) {
157            bean.setNextMaterializedTime((Timestamp) arr[11]);
158        }
159        if (arr[12] != null) {
160            bean.setCreatedTime((Timestamp) arr[12]);
161        }
162        if (arr[13] != null) {
163            bean.setTimeUnit(Timeunit.valueOf((String) arr[13]));
164        }
165        if (arr[14] != null) {
166            bean.setTimeZone((String) arr[14]);
167        }
168        if (arr[15] != null) {
169            bean.setTimeout((Integer) arr[15]);
170        }
171        if (arr[16] != null) {
172            bean.setBundleId((String) arr[16]);
173        }
174        return bean;
175    }
176}