001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.executor.jpa; 019 020import java.sql.Timestamp; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.Map; 024 025import javax.persistence.EntityManager; 026import javax.persistence.Query; 027 028import org.apache.oozie.BundleJobBean; 029import org.apache.oozie.BundleJobInfo; 030import org.apache.oozie.StringBlob; 031import org.apache.oozie.client.Job; 032import org.apache.oozie.client.BundleJob.Timeunit; 033import org.apache.oozie.store.StoreStatusFilter; 034import org.apache.oozie.util.ParamChecker; 035import org.apache.openjpa.persistence.OpenJPAPersistence; 036import org.apache.openjpa.persistence.OpenJPAQuery; 037import org.apache.openjpa.persistence.jdbc.FetchDirection; 038import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan; 039import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm; 040import org.apache.openjpa.persistence.jdbc.ResultSetType; 041 042/** 043 * Load the BundleInfo and return it. 044 */ 045public class BundleJobInfoGetJPAExecutor implements JPAExecutor<BundleJobInfo> { 046 047 private Map<String, List<String>> filter; 048 private int start = 1; 049 private int len = 50; 050 051 /** 052 * The constructor for BundleJobInfoGetJPAExecutor 053 * 054 * @param filter the filter string 055 * @param start start location for paging 056 * @param len total length to get 057 */ 058 public BundleJobInfoGetJPAExecutor(Map<String, List<String>> filter, int start, int len) { 059 ParamChecker.notNull(filter, "filter"); 060 this.filter = filter; 061 this.start = start; 062 this.len = len; 063 } 064 065 /* (non-Javadoc) 066 * @see org.apache.oozie.executor.jpa.JPAExecutor#getName() 067 */ 068 @Override 069 public String getName() { 070 return "BundleJobInfoGetJPAExecutor"; 071 } 072 073 /* (non-Javadoc) 074 * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager) 075 */ 076 @Override 077 @SuppressWarnings("unchecked") 078 public BundleJobInfo execute(EntityManager em) throws JPAExecutorException { 079 List<String> orArray = new ArrayList<String>(); 080 List<String> colArray = new ArrayList<String>(); 081 List<String> valArray = new ArrayList<String>(); 082 StringBuilder sb = new StringBuilder(""); 083 084 StoreStatusFilter.filter(filter, orArray, colArray, valArray, sb, StoreStatusFilter.bundleSeletStr, 085 StoreStatusFilter.bundleCountStr); 086 087 int realLen = 0; 088 089 Query q = null; 090 Query qTotal = null; 091 if (orArray.size() == 0) { 092 q = em.createNamedQuery("GET_BUNDLE_JOBS_COLUMNS"); 093 q.setFirstResult(start - 1); 094 q.setMaxResults(len); 095 qTotal = em.createNamedQuery("GET_BUNDLE_JOBS_COUNT"); 096 } 097 else { 098 StringBuilder sbTotal = new StringBuilder(sb); 099 sb.append(" order by w.createdTimestamp desc "); 100 q = em.createQuery(sb.toString()); 101 q.setFirstResult(start - 1); 102 q.setMaxResults(len); 103 qTotal = em.createQuery(sbTotal.toString().replace(StoreStatusFilter.bundleSeletStr, 104 StoreStatusFilter.bundleCountStr)); 105 } 106 107 for (int i = 0; i < orArray.size(); i++) { 108 q.setParameter(colArray.get(i), valArray.get(i)); 109 qTotal.setParameter(colArray.get(i), valArray.get(i)); 110 } 111 112 OpenJPAQuery kq = OpenJPAPersistence.cast(q); 113 JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan(); 114 fetch.setFetchBatchSize(20); 115 fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE); 116 fetch.setFetchDirection(FetchDirection.FORWARD); 117 fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST); 118 List<?> resultList = q.getResultList(); 119 List<Object[]> objectArrList = (List<Object[]>) resultList; 120 List<BundleJobBean> bundleBeansList = new ArrayList<BundleJobBean>(); 121 122 for (Object[] arr : objectArrList) { 123 BundleJobBean bean = getBeanForBundleJobFromArray(arr); 124 bundleBeansList.add(bean); 125 } 126 127 realLen = ((Long) qTotal.getSingleResult()).intValue(); 128 129 return new BundleJobInfo(bundleBeansList, start, len, realLen); 130 } 131 132 private BundleJobBean getBeanForBundleJobFromArray(Object[] arr) { 133 134 BundleJobBean bean = new BundleJobBean(); 135 bean.setId((String) arr[0]); 136 if (arr[1] != null) { 137 bean.setAppName((String) arr[1]); 138 } 139 if (arr[2] != null) { 140 bean.setAppPath((String) arr[2]); 141 } 142 if (arr[3] != null) { 143 bean.setConfBlob((StringBlob) arr[3]); 144 } 145 if (arr[4] != null) { 146 bean.setStatus(Job.Status.valueOf((String) arr[4])); 147 } 148 if (arr[5] != null) { 149 bean.setKickoffTime((Timestamp) arr[5]); 150 } 151 if (arr[6] != null) { 152 bean.setStartTime((Timestamp) arr[6]); 153 } 154 if (arr[7] != null) { 155 bean.setEndTime((Timestamp) arr[7]); 156 } 157 if (arr[8] != null) { 158 bean.setPauseTime((Timestamp) arr[8]); 159 } 160 if (arr[9] != null) { 161 bean.setCreatedTime((Timestamp) arr[9]); 162 } 163 if (arr[10] != null) { 164 bean.setUser((String) arr[10]); 165 } 166 if (arr[11] != null) { 167 bean.setGroup((String) arr[11]); 168 } 169 if (arr[12] != null) { 170 bean.setTimeUnit(Timeunit.valueOf((String) arr[12])); 171 } 172 if (arr[13] != null) { 173 bean.setTimeOut((Integer) arr[13]); 174 } 175 176 return bean; 177 } 178}