001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.executor.jpa; 019 020 import java.sql.Timestamp; 021 import java.util.ArrayList; 022 import java.util.List; 023 import java.util.Map; 024 025 import javax.persistence.EntityManager; 026 import javax.persistence.Query; 027 028 import org.apache.oozie.CoordinatorJobBean; 029 import org.apache.oozie.CoordinatorJobInfo; 030 import org.apache.oozie.client.Job.Status; 031 import org.apache.oozie.client.CoordinatorJob.Timeunit; 032 import org.apache.oozie.store.StoreStatusFilter; 033 import org.apache.oozie.util.ParamChecker; 034 import org.apache.openjpa.persistence.OpenJPAPersistence; 035 import org.apache.openjpa.persistence.OpenJPAQuery; 036 import org.apache.openjpa.persistence.jdbc.FetchDirection; 037 import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan; 038 import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm; 039 import org.apache.openjpa.persistence.jdbc.ResultSetType; 040 041 /** 042 * Load the CoordinatorInfo and return it. 043 */ 044 public class CoordJobInfoGetJPAExecutor implements JPAExecutor<CoordinatorJobInfo> { 045 046 private Map<String, List<String>> filter; 047 private int start = 1; 048 private int len = 50; 049 050 public CoordJobInfoGetJPAExecutor(Map<String, List<String>> filter, int start, int len) { 051 ParamChecker.notNull(filter, "filter"); 052 this.filter = filter; 053 this.start = start; 054 this.len = len; 055 } 056 057 @Override 058 public String getName() { 059 return "CoordInfoGetJPAExecutor"; 060 } 061 062 @Override 063 @SuppressWarnings("unchecked") 064 public CoordinatorJobInfo execute(EntityManager em) throws JPAExecutorException { 065 List<String> orArray = new ArrayList<String>(); 066 List<String> colArray = new ArrayList<String>(); 067 List<String> valArray = new ArrayList<String>(); 068 StringBuilder sb = new StringBuilder(""); 069 070 StoreStatusFilter.filter(filter, orArray, colArray, valArray, sb, StoreStatusFilter.coordSeletStr, 071 StoreStatusFilter.coordCountStr); 072 073 int realLen = 0; 074 075 Query q = null; 076 Query qTotal = null; 077 if (orArray.size() == 0) { 078 q = em.createNamedQuery("GET_COORD_JOBS_COLUMNS"); 079 q.setFirstResult(start - 1); 080 q.setMaxResults(len); 081 qTotal = em.createNamedQuery("GET_COORD_JOBS_COUNT"); 082 } 083 else { 084 StringBuilder sbTotal = new StringBuilder(sb); 085 sb.append(" order by w.createdTimestamp desc "); 086 q = em.createQuery(sb.toString()); 087 q.setFirstResult(start - 1); 088 q.setMaxResults(len); 089 qTotal = em.createQuery(sbTotal.toString().replace(StoreStatusFilter.coordSeletStr, 090 StoreStatusFilter.coordCountStr)); 091 } 092 093 for (int i = 0; i < orArray.size(); i++) { 094 q.setParameter(colArray.get(i), valArray.get(i)); 095 qTotal.setParameter(colArray.get(i), valArray.get(i)); 096 } 097 098 OpenJPAQuery kq = OpenJPAPersistence.cast(q); 099 JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan(); 100 fetch.setFetchBatchSize(20); 101 fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE); 102 fetch.setFetchDirection(FetchDirection.FORWARD); 103 fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST); 104 List<?> resultList = q.getResultList(); 105 List<Object[]> objectArrList = (List<Object[]>) resultList; 106 List<CoordinatorJobBean> coordBeansList = new ArrayList<CoordinatorJobBean>(); 107 108 for (Object[] arr : objectArrList) { 109 CoordinatorJobBean ww = getBeanForCoordinatorJobFromArray(arr); 110 coordBeansList.add(ww); 111 } 112 113 realLen = ((Long) qTotal.getSingleResult()).intValue(); 114 115 return new CoordinatorJobInfo(coordBeansList, start, len, realLen); 116 } 117 118 private CoordinatorJobBean getBeanForCoordinatorJobFromArray(Object[] arr) { 119 CoordinatorJobBean bean = new CoordinatorJobBean(); 120 bean.setId((String) arr[0]); 121 if (arr[1] != null) { 122 bean.setAppName((String) arr[1]); 123 } 124 if (arr[2] != null) { 125 bean.setStatus(Status.valueOf((String) arr[2])); 126 } 127 if (arr[3] != null) { 128 bean.setUser((String) arr[3]); 129 } 130 if (arr[4] != null) { 131 bean.setGroup((String) arr[4]); 132 } 133 if (arr[5] != null) { 134 bean.setStartTime((Timestamp) arr[5]); 135 } 136 if (arr[6] != null) { 137 bean.setEndTime((Timestamp) arr[6]); 138 } 139 if (arr[7] != null) { 140 bean.setAppPath((String) arr[7]); 141 } 142 if (arr[8] != null) { 143 bean.setConcurrency(((Integer) arr[8]).intValue()); 144 } 145 if (arr[9] != null) { 146 bean.setFrequency(((Integer) arr[9]).intValue()); 147 } 148 if (arr[10] != null) { 149 bean.setLastActionTime((Timestamp) arr[10]); 150 } 151 if (arr[11] != null) { 152 bean.setNextMaterializedTime((Timestamp) arr[11]); 153 } 154 if (arr[12] != null) { 155 bean.setCreatedTime((Timestamp) arr[12]); 156 } 157 if (arr[13] != null) { 158 bean.setTimeUnit(Timeunit.valueOf((String) arr[13])); 159 } 160 if (arr[14] != null) { 161 bean.setTimeZone((String) arr[14]); 162 } 163 if (arr[15] != null) { 164 bean.setTimeout((Integer) arr[15]); 165 } 166 return bean; 167 } 168 }