001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.executor.jpa; 019 020 import java.sql.Timestamp; 021 import java.util.ArrayList; 022 import java.util.List; 023 import java.util.Map; 024 025 import javax.persistence.EntityManager; 026 import javax.persistence.Query; 027 028 import org.apache.oozie.BundleJobBean; 029 import org.apache.oozie.BundleJobInfo; 030 import org.apache.oozie.client.Job; 031 import org.apache.oozie.client.BundleJob.Timeunit; 032 import org.apache.oozie.store.StoreStatusFilter; 033 import org.apache.oozie.util.ParamChecker; 034 import org.apache.openjpa.persistence.OpenJPAPersistence; 035 import org.apache.openjpa.persistence.OpenJPAQuery; 036 import org.apache.openjpa.persistence.jdbc.FetchDirection; 037 import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan; 038 import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm; 039 import org.apache.openjpa.persistence.jdbc.ResultSetType; 040 041 /** 042 * Load the BundleInfo and return it. 043 */ 044 public class BundleJobInfoGetJPAExecutor implements JPAExecutor<BundleJobInfo> { 045 046 private Map<String, List<String>> filter; 047 private int start = 1; 048 private int len = 50; 049 050 /** 051 * The constructor for BundleJobInfoGetJPAExecutor 052 * 053 * @param filter the filter string 054 * @param start start location for paging 055 * @param len total length to get 056 */ 057 public BundleJobInfoGetJPAExecutor(Map<String, List<String>> filter, int start, int len) { 058 ParamChecker.notNull(filter, "filter"); 059 this.filter = filter; 060 this.start = start; 061 this.len = len; 062 } 063 064 /* (non-Javadoc) 065 * @see org.apache.oozie.executor.jpa.JPAExecutor#getName() 066 */ 067 @Override 068 public String getName() { 069 return "BundleJobInfoGetJPAExecutor"; 070 } 071 072 /* (non-Javadoc) 073 * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager) 074 */ 075 @Override 076 @SuppressWarnings("unchecked") 077 public BundleJobInfo execute(EntityManager em) throws JPAExecutorException { 078 List<String> orArray = new ArrayList<String>(); 079 List<String> colArray = new ArrayList<String>(); 080 List<String> valArray = new ArrayList<String>(); 081 StringBuilder sb = new StringBuilder(""); 082 083 StoreStatusFilter.filter(filter, orArray, colArray, valArray, sb, StoreStatusFilter.bundleSeletStr, 084 StoreStatusFilter.bundleCountStr); 085 086 int realLen = 0; 087 088 Query q = null; 089 Query qTotal = null; 090 if (orArray.size() == 0) { 091 q = em.createNamedQuery("GET_BUNDLE_JOBS_COLUMNS"); 092 q.setFirstResult(start - 1); 093 q.setMaxResults(len); 094 qTotal = em.createNamedQuery("GET_BUNDLE_JOBS_COUNT"); 095 } 096 else { 097 StringBuilder sbTotal = new StringBuilder(sb); 098 sb.append(" order by w.createdTimestamp desc "); 099 q = em.createQuery(sb.toString()); 100 q.setFirstResult(start - 1); 101 q.setMaxResults(len); 102 qTotal = em.createQuery(sbTotal.toString().replace(StoreStatusFilter.bundleSeletStr, 103 StoreStatusFilter.bundleCountStr)); 104 } 105 106 for (int i = 0; i < orArray.size(); i++) { 107 q.setParameter(colArray.get(i), valArray.get(i)); 108 qTotal.setParameter(colArray.get(i), valArray.get(i)); 109 } 110 111 OpenJPAQuery kq = OpenJPAPersistence.cast(q); 112 JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan(); 113 fetch.setFetchBatchSize(20); 114 fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE); 115 fetch.setFetchDirection(FetchDirection.FORWARD); 116 fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST); 117 List<?> resultList = q.getResultList(); 118 List<Object[]> objectArrList = (List<Object[]>) resultList; 119 List<BundleJobBean> bundleBeansList = new ArrayList<BundleJobBean>(); 120 121 for (Object[] arr : objectArrList) { 122 BundleJobBean bean = getBeanForBundleJobFromArray(arr); 123 bundleBeansList.add(bean); 124 } 125 126 realLen = ((Long) qTotal.getSingleResult()).intValue(); 127 128 return new BundleJobInfo(bundleBeansList, start, len, realLen); 129 } 130 131 private BundleJobBean getBeanForBundleJobFromArray(Object[] arr) { 132 133 BundleJobBean bean = new BundleJobBean(); 134 bean.setId((String) arr[0]); 135 if (arr[1] != null) { 136 bean.setAppName((String) arr[1]); 137 } 138 if (arr[2] != null) { 139 bean.setAppPath((String) arr[2]); 140 } 141 if (arr[3] != null) { 142 bean.setConf((String) arr[3]); 143 } 144 if (arr[4] != null) { 145 bean.setStatus(Job.Status.valueOf((String) arr[4])); 146 } 147 if (arr[5] != null) { 148 bean.setKickoffTime((Timestamp) arr[5]); 149 } 150 if (arr[6] != null) { 151 bean.setStartTime((Timestamp) arr[6]); 152 } 153 if (arr[7] != null) { 154 bean.setEndTime((Timestamp) arr[7]); 155 } 156 if (arr[8] != null) { 157 bean.setPauseTime((Timestamp) arr[8]); 158 } 159 if (arr[9] != null) { 160 bean.setCreatedTime((Timestamp) arr[9]); 161 } 162 if (arr[10] != null) { 163 bean.setUser((String) arr[10]); 164 } 165 if (arr[11] != null) { 166 bean.setGroup((String) arr[11]); 167 } 168 if (arr[12] != null) { 169 bean.setTimeUnit(Timeunit.valueOf((String) arr[12])); 170 } 171 if (arr[13] != null) { 172 bean.setTimeOut((Integer) arr[13]); 173 } 174 175 return bean; 176 } 177 }