001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.executor.jpa; 020 021import java.sql.Timestamp; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025 026import javax.persistence.EntityManager; 027import javax.persistence.Query; 028 029import org.apache.oozie.CoordinatorJobBean; 030import org.apache.oozie.CoordinatorJobInfo; 031import org.apache.oozie.client.Job.Status; 032import org.apache.oozie.client.CoordinatorJob.Timeunit; 033import org.apache.oozie.store.StoreStatusFilter; 034import org.apache.oozie.util.ParamChecker; 035import org.apache.openjpa.persistence.OpenJPAPersistence; 036import org.apache.openjpa.persistence.OpenJPAQuery; 037import org.apache.openjpa.persistence.jdbc.FetchDirection; 038import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan; 039import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm; 040import org.apache.openjpa.persistence.jdbc.ResultSetType; 041 042/** 043 * Load the CoordinatorInfo and return it. 044 */ 045public class CoordJobInfoGetJPAExecutor implements JPAExecutor<CoordinatorJobInfo> { 046 047 public static final String DEFAULT_ORDER_BY = " order by w.createdTimestamp desc "; 048 private Map<String, List<String>> filter; 049 private int start = 1; 050 private int len = 50; 051 052 public CoordJobInfoGetJPAExecutor(Map<String, List<String>> filter, int start, int len) { 053 ParamChecker.notNull(filter, "filter"); 054 this.filter = filter; 055 this.start = start; 056 this.len = len; 057 } 058 059 @Override 060 public String getName() { 061 return "CoordJobInfoGetJPAExecutor"; 062 } 063 064 @Override 065 @SuppressWarnings("unchecked") 066 public CoordinatorJobInfo execute(EntityManager em) throws JPAExecutorException { 067 List<String> orArray = new ArrayList<String>(); 068 List<String> colArray = new ArrayList<String>(); 069 List<Object> valArray = new ArrayList<Object>(); 070 StringBuilder sb = new StringBuilder(""); 071 String orderBy = DEFAULT_ORDER_BY; 072 073 StoreStatusFilter.filter(filter, orArray, colArray, valArray, sb, StoreStatusFilter.coordSeletStr, 074 StoreStatusFilter.coordCountStr); 075 076 orderBy = StoreStatusFilter.getSortBy(filter, orderBy); 077 int realLen = 0; 078 079 Query q = null; 080 Query qTotal = null; 081 if (orArray.size() == 0 && orderBy.equals(DEFAULT_ORDER_BY)) { 082 q = em.createNamedQuery("GET_COORD_JOBS_COLUMNS"); 083 q.setFirstResult(start - 1); 084 q.setMaxResults(len); 085 qTotal = em.createNamedQuery("GET_COORD_JOBS_COUNT"); 086 } 087 else { 088 sb = sb.toString().trim().length() == 0 ? sb.append(StoreStatusFilter.coordSeletStr) : sb; 089 String sbTotal = sb.toString(); 090 sb.append(orderBy); 091 q = em.createQuery(sb.toString()); 092 q.setFirstResult(start - 1); 093 q.setMaxResults(len); 094 qTotal = em.createQuery(sbTotal.replace(StoreStatusFilter.coordSeletStr, 095 StoreStatusFilter.coordCountStr)); 096 } 097 098 for (int i = 0; i < orArray.size(); i++) { 099 q.setParameter(colArray.get(i), valArray.get(i)); 100 qTotal.setParameter(colArray.get(i), valArray.get(i)); 101 } 102 103 OpenJPAQuery kq = OpenJPAPersistence.cast(q); 104 JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan(); 105 fetch.setFetchBatchSize(20); 106 fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE); 107 fetch.setFetchDirection(FetchDirection.FORWARD); 108 fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST); 109 List<?> resultList = q.getResultList(); 110 List<Object[]> objectArrList = (List<Object[]>) resultList; 111 List<CoordinatorJobBean> coordBeansList = new ArrayList<CoordinatorJobBean>(); 112 113 for (Object[] arr : objectArrList) { 114 CoordinatorJobBean ww = getBeanForCoordinatorJobFromArray(arr); 115 coordBeansList.add(ww); 116 } 117 118 realLen = ((Long) qTotal.getSingleResult()).intValue(); 119 120 return new CoordinatorJobInfo(coordBeansList, start, len, realLen); 121 } 122 123 private CoordinatorJobBean getBeanForCoordinatorJobFromArray(Object[] arr) { 124 CoordinatorJobBean bean = new CoordinatorJobBean(); 125 bean.setId((String) arr[0]); 126 if (arr[1] != null) { 127 bean.setAppName((String) arr[1]); 128 } 129 if (arr[2] != null) { 130 bean.setStatus(Status.valueOf((String) arr[2])); 131 } 132 if (arr[3] != null) { 133 bean.setUser((String) arr[3]); 134 } 135 if (arr[4] != null) { 136 bean.setGroup((String) arr[4]); 137 } 138 if (arr[5] != null) { 139 bean.setStartTime((Timestamp) arr[5]); 140 } 141 if (arr[6] != null) { 142 bean.setEndTime((Timestamp) arr[6]); 143 } 144 if (arr[7] != null) { 145 bean.setAppPath((String) arr[7]); 146 } 147 if (arr[8] != null) { 148 bean.setConcurrency(((Integer) arr[8]).intValue()); 149 } 150 if (arr[9] != null) { 151 bean.setFrequency((String) arr[9]); 152 } 153 if (arr[10] != null) { 154 bean.setLastActionTime((Timestamp) arr[10]); 155 } 156 if (arr[11] != null) { 157 bean.setNextMaterializedTime((Timestamp) arr[11]); 158 } 159 if (arr[12] != null) { 160 bean.setCreatedTime((Timestamp) arr[12]); 161 } 162 if (arr[13] != null) { 163 bean.setTimeUnit(Timeunit.valueOf((String) arr[13])); 164 } 165 if (arr[14] != null) { 166 bean.setTimeZone((String) arr[14]); 167 } 168 if (arr[15] != null) { 169 bean.setTimeout((Integer) arr[15]); 170 } 171 if (arr[16] != null) { 172 bean.setBundleId((String) arr[16]); 173 } 174 return bean; 175 } 176}