001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.executor.jpa;
019
020import java.sql.Timestamp;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Map;
024
025import javax.persistence.EntityManager;
026import javax.persistence.Query;
027
028import org.apache.oozie.CoordinatorJobBean;
029import org.apache.oozie.CoordinatorJobInfo;
030import org.apache.oozie.client.Job.Status;
031import org.apache.oozie.client.CoordinatorJob.Timeunit;
032import org.apache.oozie.store.StoreStatusFilter;
033import org.apache.oozie.util.ParamChecker;
034import org.apache.openjpa.persistence.OpenJPAPersistence;
035import org.apache.openjpa.persistence.OpenJPAQuery;
036import org.apache.openjpa.persistence.jdbc.FetchDirection;
037import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
038import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
039import org.apache.openjpa.persistence.jdbc.ResultSetType;
040
041/**
042 * Load the CoordinatorInfo and return it.
043 */
044public class CoordJobInfoGetJPAExecutor implements JPAExecutor<CoordinatorJobInfo> {
045
046    private Map<String, List<String>> filter;
047    private int start = 1;
048    private int len = 50;
049
050    public CoordJobInfoGetJPAExecutor(Map<String, List<String>> filter, int start, int len) {
051        ParamChecker.notNull(filter, "filter");
052        this.filter = filter;
053        this.start = start;
054        this.len = len;
055    }
056
057    @Override
058    public String getName() {
059        return "CoordInfoGetJPAExecutor";
060    }
061
062    @Override
063    @SuppressWarnings("unchecked")
064    public CoordinatorJobInfo execute(EntityManager em) throws JPAExecutorException {
065        List<String> orArray = new ArrayList<String>();
066        List<String> colArray = new ArrayList<String>();
067        List<String> valArray = new ArrayList<String>();
068        StringBuilder sb = new StringBuilder("");
069
070        StoreStatusFilter.filter(filter, orArray, colArray, valArray, sb, StoreStatusFilter.coordSeletStr,
071                                 StoreStatusFilter.coordCountStr);
072
073        int realLen = 0;
074
075        Query q = null;
076        Query qTotal = null;
077        if (orArray.size() == 0) {
078            q = em.createNamedQuery("GET_COORD_JOBS_COLUMNS");
079            q.setFirstResult(start - 1);
080            q.setMaxResults(len);
081            qTotal = em.createNamedQuery("GET_COORD_JOBS_COUNT");
082        }
083        else {
084            StringBuilder sbTotal = new StringBuilder(sb);
085            sb.append(" order by w.createdTimestamp desc ");
086            q = em.createQuery(sb.toString());
087            q.setFirstResult(start - 1);
088            q.setMaxResults(len);
089            qTotal = em.createQuery(sbTotal.toString().replace(StoreStatusFilter.coordSeletStr,
090                                                                          StoreStatusFilter.coordCountStr));
091        }
092
093        for (int i = 0; i < orArray.size(); i++) {
094            q.setParameter(colArray.get(i), valArray.get(i));
095            qTotal.setParameter(colArray.get(i), valArray.get(i));
096        }
097
098        OpenJPAQuery kq = OpenJPAPersistence.cast(q);
099        JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
100        fetch.setFetchBatchSize(20);
101        fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
102        fetch.setFetchDirection(FetchDirection.FORWARD);
103        fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
104        List<?> resultList = q.getResultList();
105        List<Object[]> objectArrList = (List<Object[]>) resultList;
106        List<CoordinatorJobBean> coordBeansList = new ArrayList<CoordinatorJobBean>();
107
108        for (Object[] arr : objectArrList) {
109            CoordinatorJobBean ww = getBeanForCoordinatorJobFromArray(arr);
110            coordBeansList.add(ww);
111        }
112
113        realLen = ((Long) qTotal.getSingleResult()).intValue();
114
115        return new CoordinatorJobInfo(coordBeansList, start, len, realLen);
116    }
117
118    private CoordinatorJobBean getBeanForCoordinatorJobFromArray(Object[] arr) {
119        CoordinatorJobBean bean = new CoordinatorJobBean();
120        bean.setId((String) arr[0]);
121        if (arr[1] != null) {
122            bean.setAppName((String) arr[1]);
123        }
124        if (arr[2] != null) {
125            bean.setStatus(Status.valueOf((String) arr[2]));
126        }
127        if (arr[3] != null) {
128            bean.setUser((String) arr[3]);
129        }
130        if (arr[4] != null) {
131            bean.setGroup((String) arr[4]);
132        }
133        if (arr[5] != null) {
134            bean.setStartTime((Timestamp) arr[5]);
135        }
136        if (arr[6] != null) {
137            bean.setEndTime((Timestamp) arr[6]);
138        }
139        if (arr[7] != null) {
140            bean.setAppPath((String) arr[7]);
141        }
142        if (arr[8] != null) {
143            bean.setConcurrency(((Integer) arr[8]).intValue());
144        }
145        if (arr[9] != null) {
146            bean.setFrequency((String) arr[9]);
147        }
148        if (arr[10] != null) {
149            bean.setLastActionTime((Timestamp) arr[10]);
150        }
151        if (arr[11] != null) {
152            bean.setNextMaterializedTime((Timestamp) arr[11]);
153        }
154        if (arr[12] != null) {
155            bean.setCreatedTime((Timestamp) arr[12]);
156        }
157        if (arr[13] != null) {
158            bean.setTimeUnit(Timeunit.valueOf((String) arr[13]));
159        }
160        if (arr[14] != null) {
161            bean.setTimeZone((String) arr[14]);
162        }
163        if (arr[15] != null) {
164            bean.setTimeout((Integer) arr[15]);
165        }
166        return bean;
167    }
168}