This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.executor.jpa;
019
020 import java.sql.Timestamp;
021 import java.util.ArrayList;
022 import java.util.List;
023 import java.util.Map;
024
025 import javax.persistence.EntityManager;
026 import javax.persistence.Query;
027
028 import org.apache.oozie.BundleJobBean;
029 import org.apache.oozie.BundleJobInfo;
030 import org.apache.oozie.client.Job;
031 import org.apache.oozie.client.BundleJob.Timeunit;
032 import org.apache.oozie.store.StoreStatusFilter;
033 import org.apache.oozie.util.ParamChecker;
034 import org.apache.openjpa.persistence.OpenJPAPersistence;
035 import org.apache.openjpa.persistence.OpenJPAQuery;
036 import org.apache.openjpa.persistence.jdbc.FetchDirection;
037 import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
038 import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
039 import org.apache.openjpa.persistence.jdbc.ResultSetType;
040
041 /**
042 * Load the BundleInfo and return it.
043 */
044 public class BundleJobInfoGetJPAExecutor implements JPAExecutor<BundleJobInfo> {
045
046 private Map<String, List<String>> filter;
047 private int start = 1;
048 private int len = 50;
049
050 /**
051 * The constructor for BundleJobInfoGetJPAExecutor
052 *
053 * @param filter the filter string
054 * @param start start location for paging
055 * @param len total length to get
056 */
057 public BundleJobInfoGetJPAExecutor(Map<String, List<String>> filter, int start, int len) {
058 ParamChecker.notNull(filter, "filter");
059 this.filter = filter;
060 this.start = start;
061 this.len = len;
062 }
063
064 /* (non-Javadoc)
065 * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
066 */
067 @Override
068 public String getName() {
069 return "BundleJobInfoGetJPAExecutor";
070 }
071
072 /* (non-Javadoc)
073 * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
074 */
075 @Override
076 @SuppressWarnings("unchecked")
077 public BundleJobInfo execute(EntityManager em) throws JPAExecutorException {
078 List<String> orArray = new ArrayList<String>();
079 List<String> colArray = new ArrayList<String>();
080 List<String> valArray = new ArrayList<String>();
081 StringBuilder sb = new StringBuilder("");
082
083 StoreStatusFilter.filter(filter, orArray, colArray, valArray, sb, StoreStatusFilter.bundleSeletStr,
084 StoreStatusFilter.bundleCountStr);
085
086 int realLen = 0;
087
088 Query q = null;
089 Query qTotal = null;
090 if (orArray.size() == 0) {
091 q = em.createNamedQuery("GET_BUNDLE_JOBS_COLUMNS");
092 q.setFirstResult(start - 1);
093 q.setMaxResults(len);
094 qTotal = em.createNamedQuery("GET_BUNDLE_JOBS_COUNT");
095 }
096 else {
097 StringBuilder sbTotal = new StringBuilder(sb);
098 sb.append(" order by w.createdTimestamp desc ");
099 q = em.createQuery(sb.toString());
100 q.setFirstResult(start - 1);
101 q.setMaxResults(len);
102 qTotal = em.createQuery(sbTotal.toString().replace(StoreStatusFilter.bundleSeletStr,
103 StoreStatusFilter.bundleCountStr));
104 }
105
106 for (int i = 0; i < orArray.size(); i++) {
107 q.setParameter(colArray.get(i), valArray.get(i));
108 qTotal.setParameter(colArray.get(i), valArray.get(i));
109 }
110
111 OpenJPAQuery kq = OpenJPAPersistence.cast(q);
112 JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
113 fetch.setFetchBatchSize(20);
114 fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
115 fetch.setFetchDirection(FetchDirection.FORWARD);
116 fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
117 List<?> resultList = q.getResultList();
118 List<Object[]> objectArrList = (List<Object[]>) resultList;
119 List<BundleJobBean> bundleBeansList = new ArrayList<BundleJobBean>();
120
121 for (Object[] arr : objectArrList) {
122 BundleJobBean bean = getBeanForBundleJobFromArray(arr);
123 bundleBeansList.add(bean);
124 }
125
126 realLen = ((Long) qTotal.getSingleResult()).intValue();
127
128 return new BundleJobInfo(bundleBeansList, start, len, realLen);
129 }
130
131 private BundleJobBean getBeanForBundleJobFromArray(Object[] arr) {
132
133 BundleJobBean bean = new BundleJobBean();
134 bean.setId((String) arr[0]);
135 if (arr[1] != null) {
136 bean.setAppName((String) arr[1]);
137 }
138 if (arr[2] != null) {
139 bean.setAppPath((String) arr[2]);
140 }
141 if (arr[3] != null) {
142 bean.setConf((String) arr[3]);
143 }
144 if (arr[4] != null) {
145 bean.setStatus(Job.Status.valueOf((String) arr[4]));
146 }
147 if (arr[5] != null) {
148 bean.setKickoffTime((Timestamp) arr[5]);
149 }
150 if (arr[6] != null) {
151 bean.setStartTime((Timestamp) arr[6]);
152 }
153 if (arr[7] != null) {
154 bean.setEndTime((Timestamp) arr[7]);
155 }
156 if (arr[8] != null) {
157 bean.setPauseTime((Timestamp) arr[8]);
158 }
159 if (arr[9] != null) {
160 bean.setCreatedTime((Timestamp) arr[9]);
161 }
162 if (arr[10] != null) {
163 bean.setUser((String) arr[10]);
164 }
165 if (arr[11] != null) {
166 bean.setGroup((String) arr[11]);
167 }
168 if (arr[12] != null) {
169 bean.setTimeUnit(Timeunit.valueOf((String) arr[12]));
170 }
171 if (arr[13] != null) {
172 bean.setTimeOut((Integer) arr[13]);
173 }
174
175 return bean;
176 }
177 }