001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.executor.jpa; 019 020 import java.sql.Timestamp; 021 import java.util.ArrayList; 022 import java.util.List; 023 import java.util.Map; 024 025 import javax.persistence.EntityManager; 026 import javax.persistence.Query; 027 028 import org.apache.oozie.WorkflowJobBean; 029 import org.apache.oozie.WorkflowsInfo; 030 import org.apache.oozie.client.OozieClient; 031 import org.apache.oozie.client.WorkflowJob.Status; 032 import org.apache.oozie.util.XLog; 033 import org.apache.openjpa.persistence.OpenJPAPersistence; 034 import org.apache.openjpa.persistence.OpenJPAQuery; 035 import org.apache.openjpa.persistence.jdbc.FetchDirection; 036 import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan; 037 import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm; 038 import org.apache.openjpa.persistence.jdbc.ResultSetType; 039 040 public class WorkflowsJobGetJPAExecutor implements JPAExecutor<WorkflowsInfo> { 041 042 private static final String seletStr = "Select w.id, w.appName, w.status, w.run, w.user, w.group, w.createdTimestamp, " 043 + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp, w.externalId from WorkflowJobBean w"; 044 private static final String countStr = "Select count(w) from WorkflowJobBean w"; 045 046 private final Map<String, List<String>> filter; 047 private final int start; 048 private final int len; 049 050 /** 051 * This JPA Executor gets the workflows info for the range. 052 * 053 * @param filter 054 * @param start 055 * @param len 056 */ 057 public WorkflowsJobGetJPAExecutor(Map<String, List<String>> filter, int start, int len) { 058 this.filter = filter; 059 this.start = start; 060 this.len = len; 061 } 062 063 /* (non-Javadoc) 064 * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager) 065 */ 066 @SuppressWarnings("unchecked") 067 @Override 068 public WorkflowsInfo execute(EntityManager em) throws JPAExecutorException { 069 List<String> orArray = new ArrayList<String>(); 070 List<String> colArray = new ArrayList<String>(); 071 List<String> valArray = new ArrayList<String>(); 072 StringBuilder sb = new StringBuilder(""); 073 boolean isStatus = false; 074 boolean isAppName = false; 075 boolean isUser = false; 076 boolean isEnabled = false; 077 boolean isId = false; 078 int index = 0; 079 for (Map.Entry<String, List<String>> entry : filter.entrySet()) { 080 String colName = null; 081 String colVar = null; 082 if (entry.getKey().equals(OozieClient.FILTER_GROUP)) { 083 XLog.getLog(getClass()).warn("Filter by 'group' is not supported anymore"); 084 } else { 085 if (entry.getKey().equals(OozieClient.FILTER_STATUS)) { 086 List<String> values = filter.get(OozieClient.FILTER_STATUS); 087 colName = "status"; 088 for (int i = 0; i < values.size(); i++) { 089 colVar = "status"; 090 colVar = colVar + index; 091 if (!isEnabled && !isStatus) { 092 sb.append(seletStr).append(" where w.status IN (:status" + index); 093 isStatus = true; 094 isEnabled = true; 095 } 096 else { 097 if (isEnabled && !isStatus) { 098 sb.append(" and w.status IN (:status" + index); 099 isStatus = true; 100 } 101 else { 102 if (isStatus) { 103 sb.append(", :status" + index); 104 } 105 } 106 } 107 if (i == values.size() - 1) { 108 sb.append(")"); 109 } 110 index++; 111 valArray.add(values.get(i)); 112 orArray.add(colName); 113 colArray.add(colVar); 114 } 115 } 116 else { 117 if (entry.getKey().equals(OozieClient.FILTER_NAME)) { 118 List<String> values = filter.get(OozieClient.FILTER_NAME); 119 colName = "appName"; 120 for (int i = 0; i < values.size(); i++) { 121 colVar = "appName"; 122 colVar = colVar + index; 123 if (!isEnabled && !isAppName) { 124 sb.append(seletStr).append(" where w.appName IN (:appName" + index); 125 isAppName = true; 126 isEnabled = true; 127 } 128 else { 129 if (isEnabled && !isAppName) { 130 sb.append(" and w.appName IN (:appName" + index); 131 isAppName = true; 132 } 133 else { 134 if (isAppName) { 135 sb.append(", :appName" + index); 136 } 137 } 138 } 139 if (i == values.size() - 1) { 140 sb.append(")"); 141 } 142 index++; 143 valArray.add(values.get(i)); 144 orArray.add(colName); 145 colArray.add(colVar); 146 } 147 } 148 else { 149 if (entry.getKey().equals(OozieClient.FILTER_USER)) { 150 List<String> values = filter.get(OozieClient.FILTER_USER); 151 colName = "user"; 152 for (int i = 0; i < values.size(); i++) { 153 colVar = "user"; 154 colVar = colVar + index; 155 if (!isEnabled && !isUser) { 156 sb.append(seletStr).append(" where w.user IN (:user" + index); 157 isUser = true; 158 isEnabled = true; 159 } 160 else { 161 if (isEnabled && !isUser) { 162 sb.append(" and w.user IN (:user" + index); 163 isUser = true; 164 } 165 else { 166 if (isUser) { 167 sb.append(", :user" + index); 168 } 169 } 170 } 171 if (i == values.size() - 1) { 172 sb.append(")"); 173 } 174 index++; 175 valArray.add(values.get(i)); 176 orArray.add(colName); 177 colArray.add(colVar); 178 } 179 } 180 } 181 if (entry.getKey().equals(OozieClient.FILTER_ID)) { 182 List<String> values = filter.get(OozieClient.FILTER_ID); 183 colName = "id"; 184 for (int i = 0; i < values.size(); i++) { 185 colVar = "id"; 186 colVar = colVar + index; 187 if (!isEnabled && !isId) { 188 sb.append(seletStr).append(" where w.id IN (:id" + index); 189 isId = true; 190 isEnabled = true; 191 } 192 else { 193 if (isEnabled && !isId) { 194 sb.append(" and w.id IN (:id" + index); 195 isId = true; 196 } 197 else { 198 if (isId) { 199 sb.append(", :id" + index); 200 } 201 } 202 } 203 if (i == values.size() - 1) { 204 sb.append(")"); 205 } 206 index++; 207 valArray.add(values.get(i)); 208 orArray.add(colName); 209 colArray.add(colVar); 210 } 211 } 212 } 213 } 214 } 215 216 int realLen = 0; 217 218 Query q = null; 219 Query qTotal = null; 220 if (orArray.size() == 0) { 221 q = em.createNamedQuery("GET_WORKFLOWS_COLUMNS"); 222 q.setFirstResult(start - 1); 223 q.setMaxResults(len); 224 qTotal = em.createNamedQuery("GET_WORKFLOWS_COUNT"); 225 } 226 else { 227 if (orArray.size() > 0) { 228 StringBuilder sbTotal = new StringBuilder(sb); 229 sb.append(" order by w.startTimestamp desc "); 230 q = em.createQuery(sb.toString()); 231 q.setFirstResult(start - 1); 232 q.setMaxResults(len); 233 qTotal = em.createQuery(sbTotal.toString().replace(seletStr, countStr)); 234 for (int i = 0; i < orArray.size(); i++) { 235 q.setParameter(colArray.get(i), valArray.get(i)); 236 qTotal.setParameter(colArray.get(i), valArray.get(i)); 237 } 238 } 239 } 240 241 OpenJPAQuery kq = OpenJPAPersistence.cast(q); 242 JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan(); 243 fetch.setFetchBatchSize(20); 244 fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE); 245 fetch.setFetchDirection(FetchDirection.FORWARD); 246 fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST); 247 List<?> resultList = q.getResultList(); 248 List<Object[]> objectArrList = (List<Object[]>) resultList; 249 List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>(); 250 251 for (Object[] arr : objectArrList) { 252 WorkflowJobBean ww = getBeanForWorkflowFromArray(arr); 253 wfBeansList.add(ww); 254 } 255 256 realLen = ((Long) qTotal.getSingleResult()).intValue(); 257 258 return new WorkflowsInfo(wfBeansList, start, len, realLen); 259 } 260 261 /* (non-Javadoc) 262 * @see org.apache.oozie.executor.jpa.JPAExecutor#getName() 263 */ 264 @Override 265 public String getName() { 266 return "WorkflowsJobGetJPAExecutor"; 267 } 268 269 private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) { 270 271 WorkflowJobBean wfBean = new WorkflowJobBean(); 272 wfBean.setId((String) arr[0]); 273 if (arr[1] != null) { 274 wfBean.setAppName((String) arr[1]); 275 } 276 if (arr[2] != null) { 277 wfBean.setStatus(Status.valueOf((String) arr[2])); 278 } 279 if (arr[3] != null) { 280 wfBean.setRun((Integer) arr[3]); 281 } 282 if (arr[4] != null) { 283 wfBean.setUser((String) arr[4]); 284 } 285 if (arr[5] != null) { 286 wfBean.setGroup((String) arr[5]); 287 } 288 if (arr[6] != null) { 289 wfBean.setCreatedTime((Timestamp) arr[6]); 290 } 291 if (arr[7] != null) { 292 wfBean.setStartTime((Timestamp) arr[7]); 293 } 294 if (arr[8] != null) { 295 wfBean.setLastModifiedTime((Timestamp) arr[8]); 296 } 297 if (arr[9] != null) { 298 wfBean.setEndTime((Timestamp) arr[9]); 299 } 300 if (arr[10] != null) { 301 wfBean.setExternalId((String) arr[10]); 302 } 303 return wfBean; 304 } 305 }