001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.executor.jpa; 019 020 import java.sql.Timestamp; 021 import java.util.ArrayList; 022 import java.util.List; 023 import java.util.Map; 024 025 import javax.persistence.EntityManager; 026 import javax.persistence.Query; 027 028 import org.apache.oozie.WorkflowJobBean; 029 import org.apache.oozie.WorkflowsInfo; 030 import org.apache.oozie.client.OozieClient; 031 import org.apache.oozie.client.WorkflowJob.Status; 032 import org.apache.openjpa.persistence.OpenJPAPersistence; 033 import org.apache.openjpa.persistence.OpenJPAQuery; 034 import org.apache.openjpa.persistence.jdbc.FetchDirection; 035 import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan; 036 import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm; 037 import org.apache.openjpa.persistence.jdbc.ResultSetType; 038 039 public class WorkflowsJobGetJPAExecutor implements JPAExecutor<WorkflowsInfo> { 040 041 private static final String seletStr = "Select w.id, w.appName, w.status, w.run, w.user, w.group, w.createdTimestamp, " 042 + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp from WorkflowJobBean w"; 043 private static final String countStr = "Select count(w) from WorkflowJobBean w"; 044 045 private final Map<String, List<String>> filter; 046 private final int start; 047 private final int len; 048 049 /** 050 * This JPA Executor gets the workflows info for the range. 051 * 052 * @param filter 053 * @param start 054 * @param len 055 */ 056 public WorkflowsJobGetJPAExecutor(Map<String, List<String>> filter, int start, int len) { 057 this.filter = filter; 058 this.start = start; 059 this.len = len; 060 } 061 062 /* (non-Javadoc) 063 * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager) 064 */ 065 @SuppressWarnings("unchecked") 066 @Override 067 public WorkflowsInfo execute(EntityManager em) throws JPAExecutorException { 068 List<String> orArray = new ArrayList<String>(); 069 List<String> colArray = new ArrayList<String>(); 070 List<String> valArray = new ArrayList<String>(); 071 StringBuilder sb = new StringBuilder(""); 072 boolean isStatus = false; 073 boolean isGroup = false; 074 boolean isAppName = false; 075 boolean isUser = false; 076 boolean isEnabled = false; 077 boolean isId = false; 078 int index = 0; 079 for (Map.Entry<String, List<String>> entry : filter.entrySet()) { 080 String colName = null; 081 String colVar = null; 082 if (entry.getKey().equals(OozieClient.FILTER_GROUP)) { 083 List<String> values = filter.get(OozieClient.FILTER_GROUP); 084 colName = "group"; 085 for (int i = 0; i < values.size(); i++) { 086 colVar = "group"; 087 colVar = colVar + index; 088 if (!isEnabled && !isGroup) { 089 sb.append(seletStr).append(" where w.group IN (:group" + index); 090 isGroup = true; 091 isEnabled = true; 092 } 093 else { 094 if (isEnabled && !isGroup) { 095 sb.append(" and w.group IN (:group" + index); 096 isGroup = true; 097 } 098 else { 099 if (isGroup) { 100 sb.append(", :group" + index); 101 } 102 } 103 } 104 if (i == values.size() - 1) { 105 sb.append(")"); 106 } 107 index++; 108 valArray.add(values.get(i)); 109 orArray.add(colName); 110 colArray.add(colVar); 111 } 112 } 113 else { 114 if (entry.getKey().equals(OozieClient.FILTER_STATUS)) { 115 List<String> values = filter.get(OozieClient.FILTER_STATUS); 116 colName = "status"; 117 for (int i = 0; i < values.size(); i++) { 118 colVar = "status"; 119 colVar = colVar + index; 120 if (!isEnabled && !isStatus) { 121 sb.append(seletStr).append(" where w.status IN (:status" + index); 122 isStatus = true; 123 isEnabled = true; 124 } 125 else { 126 if (isEnabled && !isStatus) { 127 sb.append(" and w.status IN (:status" + index); 128 isStatus = true; 129 } 130 else { 131 if (isStatus) { 132 sb.append(", :status" + index); 133 } 134 } 135 } 136 if (i == values.size() - 1) { 137 sb.append(")"); 138 } 139 index++; 140 valArray.add(values.get(i)); 141 orArray.add(colName); 142 colArray.add(colVar); 143 } 144 } 145 else { 146 if (entry.getKey().equals(OozieClient.FILTER_NAME)) { 147 List<String> values = filter.get(OozieClient.FILTER_NAME); 148 colName = "appName"; 149 for (int i = 0; i < values.size(); i++) { 150 colVar = "appName"; 151 colVar = colVar + index; 152 if (!isEnabled && !isAppName) { 153 sb.append(seletStr).append(" where w.appName IN (:appName" + index); 154 isAppName = true; 155 isEnabled = true; 156 } 157 else { 158 if (isEnabled && !isAppName) { 159 sb.append(" and w.appName IN (:appName" + index); 160 isAppName = true; 161 } 162 else { 163 if (isAppName) { 164 sb.append(", :appName" + index); 165 } 166 } 167 } 168 if (i == values.size() - 1) { 169 sb.append(")"); 170 } 171 index++; 172 valArray.add(values.get(i)); 173 orArray.add(colName); 174 colArray.add(colVar); 175 } 176 } 177 else { 178 if (entry.getKey().equals(OozieClient.FILTER_USER)) { 179 List<String> values = filter.get(OozieClient.FILTER_USER); 180 colName = "user"; 181 for (int i = 0; i < values.size(); i++) { 182 colVar = "user"; 183 colVar = colVar + index; 184 if (!isEnabled && !isUser) { 185 sb.append(seletStr).append(" where w.user IN (:user" + index); 186 isUser = true; 187 isEnabled = true; 188 } 189 else { 190 if (isEnabled && !isUser) { 191 sb.append(" and w.user IN (:user" + index); 192 isUser = true; 193 } 194 else { 195 if (isUser) { 196 sb.append(", :user" + index); 197 } 198 } 199 } 200 if (i == values.size() - 1) { 201 sb.append(")"); 202 } 203 index++; 204 valArray.add(values.get(i)); 205 orArray.add(colName); 206 colArray.add(colVar); 207 } 208 } 209 } 210 if (entry.getKey().equals(OozieClient.FILTER_ID)) { 211 List<String> values = filter.get(OozieClient.FILTER_ID); 212 colName = "id"; 213 for (int i = 0; i < values.size(); i++) { 214 colVar = "id"; 215 colVar = colVar + index; 216 if (!isEnabled && !isId) { 217 sb.append(seletStr).append(" where w.id IN (:id" + index); 218 isId = true; 219 isEnabled = true; 220 } 221 else { 222 if (isEnabled && !isId) { 223 sb.append(" and w.id IN (:id" + index); 224 isId = true; 225 } 226 else { 227 if (isId) { 228 sb.append(", :id" + index); 229 } 230 } 231 } 232 if (i == values.size() - 1) { 233 sb.append(")"); 234 } 235 index++; 236 valArray.add(values.get(i)); 237 orArray.add(colName); 238 colArray.add(colVar); 239 } 240 } 241 } 242 } 243 } 244 245 int realLen = 0; 246 247 Query q = null; 248 Query qTotal = null; 249 if (orArray.size() == 0) { 250 q = em.createNamedQuery("GET_WORKFLOWS_COLUMNS"); 251 q.setFirstResult(start - 1); 252 q.setMaxResults(len); 253 qTotal = em.createNamedQuery("GET_WORKFLOWS_COUNT"); 254 } 255 else { 256 if (orArray.size() > 0) { 257 StringBuilder sbTotal = new StringBuilder(sb); 258 sb.append(" order by w.startTimestamp desc "); 259 q = em.createQuery(sb.toString()); 260 q.setFirstResult(start - 1); 261 q.setMaxResults(len); 262 qTotal = em.createQuery(sbTotal.toString().replace(seletStr, countStr)); 263 for (int i = 0; i < orArray.size(); i++) { 264 q.setParameter(colArray.get(i), valArray.get(i)); 265 qTotal.setParameter(colArray.get(i), valArray.get(i)); 266 } 267 } 268 } 269 270 OpenJPAQuery kq = OpenJPAPersistence.cast(q); 271 JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan(); 272 fetch.setFetchBatchSize(20); 273 fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE); 274 fetch.setFetchDirection(FetchDirection.FORWARD); 275 fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST); 276 List<?> resultList = q.getResultList(); 277 List<Object[]> objectArrList = (List<Object[]>) resultList; 278 List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>(); 279 280 for (Object[] arr : objectArrList) { 281 WorkflowJobBean ww = getBeanForWorkflowFromArray(arr); 282 wfBeansList.add(ww); 283 } 284 285 realLen = ((Long) qTotal.getSingleResult()).intValue(); 286 287 return new WorkflowsInfo(wfBeansList, start, len, realLen); 288 } 289 290 /* (non-Javadoc) 291 * @see org.apache.oozie.executor.jpa.JPAExecutor#getName() 292 */ 293 @Override 294 public String getName() { 295 return "WorkflowsJobGetJPAExecutor"; 296 } 297 298 private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) { 299 300 WorkflowJobBean wfBean = new WorkflowJobBean(); 301 wfBean.setId((String) arr[0]); 302 if (arr[1] != null) { 303 wfBean.setAppName((String) arr[1]); 304 } 305 if (arr[2] != null) { 306 wfBean.setStatus(Status.valueOf((String) arr[2])); 307 } 308 if (arr[3] != null) { 309 wfBean.setRun((Integer) arr[3]); 310 } 311 if (arr[4] != null) { 312 wfBean.setUser((String) arr[4]); 313 } 314 if (arr[5] != null) { 315 wfBean.setGroup((String) arr[5]); 316 } 317 if (arr[6] != null) { 318 wfBean.setCreatedTime((Timestamp) arr[6]); 319 } 320 if (arr[7] != null) { 321 wfBean.setStartTime((Timestamp) arr[7]); 322 } 323 if (arr[8] != null) { 324 wfBean.setLastModifiedTime((Timestamp) arr[8]); 325 } 326 if (arr[9] != null) { 327 wfBean.setEndTime((Timestamp) arr[9]); 328 } 329 return wfBean; 330 } 331 }