001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.executor.jpa; 020 021import java.sql.Timestamp; 022import java.util.ArrayList; 023import java.util.Date; 024import java.util.List; 025import java.util.Map; 026 027import javax.persistence.EntityManager; 028import javax.persistence.Query; 029 030import org.apache.oozie.ErrorCode; 031import org.apache.oozie.WorkflowJobBean; 032import org.apache.oozie.WorkflowsInfo; 033import org.apache.oozie.client.OozieClient; 034import org.apache.oozie.client.WorkflowJob.Status; 035import org.apache.oozie.store.StoreStatusFilter; 036import org.apache.oozie.util.DateUtils; 037import org.apache.oozie.util.XLog; 038import org.apache.openjpa.persistence.OpenJPAPersistence; 039import org.apache.openjpa.persistence.OpenJPAQuery; 040import org.apache.openjpa.persistence.jdbc.FetchDirection; 041import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan; 042import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm; 043import org.apache.openjpa.persistence.jdbc.ResultSetType; 044 045public class WorkflowsJobGetJPAExecutor implements JPAExecutor<WorkflowsInfo> { 046 047 private static final String seletStr = "Select w.id, w.appName, w.statusStr, w.run, w.user, w.group, w.createdTimestamp, " 048 + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp, w.externalId, w.parentId from WorkflowJobBean w"; 049 private static final String countStr = "Select count(w) from WorkflowJobBean w"; 050 public static final String DEFAULT_ORDER_BY = " order by w.createdTimestamp desc "; 051 052 private final Map<String, List<String>> filter; 053 private final int start; 054 private final int len; 055 056 /** 057 * This JPA Executor gets the workflows info for the range. 058 * 059 * @param filter 060 * @param start 061 * @param len 062 */ 063 public WorkflowsJobGetJPAExecutor(Map<String, List<String>> filter, int start, int len) { 064 this.filter = filter; 065 this.start = start; 066 this.len = len; 067 } 068 069 /* (non-Javadoc) 070 * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager) 071 */ 072 @SuppressWarnings("unchecked") 073 @Override 074 public WorkflowsInfo execute(EntityManager em) throws JPAExecutorException { 075 List<String> orArray = new ArrayList<String>(); 076 List<String> colArray = new ArrayList<String>(); 077 List<Object> valArray = new ArrayList<Object>(); 078 StringBuilder sb = new StringBuilder(""); 079 String orderBy = DEFAULT_ORDER_BY; 080 boolean isStatus = false; 081 boolean isAppName = false; 082 boolean isUser = false; 083 boolean isEnabled = false; 084 boolean isId = false; 085 int index = 0; 086 for (Map.Entry<String, List<String>> entry : filter.entrySet()) { 087 String colName = null; 088 String colVar = null; 089 if (entry.getKey().equals(OozieClient.FILTER_GROUP)) { 090 XLog.getLog(getClass()).warn("Filter by 'group' is not supported anymore"); 091 } else { 092 if (entry.getKey().equals(OozieClient.FILTER_STATUS)) { 093 List<String> values = filter.get(OozieClient.FILTER_STATUS); 094 colName = "status"; 095 for (int i = 0; i < values.size(); i++) { 096 colVar = "status"; 097 colVar = colVar + index; 098 if (!isEnabled && !isStatus) { 099 sb.append(seletStr).append(" where w.statusStr IN (:status" + index); 100 isStatus = true; 101 isEnabled = true; 102 } 103 else { 104 if (isEnabled && !isStatus) { 105 sb.append(" and w.statusStr IN (:status" + index); 106 isStatus = true; 107 } 108 else { 109 if (isStatus) { 110 sb.append(", :status" + index); 111 } 112 } 113 } 114 if (i == values.size() - 1) { 115 sb.append(")"); 116 } 117 index++; 118 valArray.add(values.get(i)); 119 orArray.add(colName); 120 colArray.add(colVar); 121 } 122 } 123 else { 124 if (entry.getKey().equals(OozieClient.FILTER_NAME)) { 125 List<String> values = filter.get(OozieClient.FILTER_NAME); 126 colName = "appName"; 127 for (int i = 0; i < values.size(); i++) { 128 colVar = "appName"; 129 colVar = colVar + index; 130 if (!isEnabled && !isAppName) { 131 sb.append(seletStr).append(" where w.appName IN (:appName" + index); 132 isAppName = true; 133 isEnabled = true; 134 } 135 else { 136 if (isEnabled && !isAppName) { 137 sb.append(" and w.appName IN (:appName" + index); 138 isAppName = true; 139 } 140 else { 141 if (isAppName) { 142 sb.append(", :appName" + index); 143 } 144 } 145 } 146 if (i == values.size() - 1) { 147 sb.append(")"); 148 } 149 index++; 150 valArray.add(values.get(i)); 151 orArray.add(colName); 152 colArray.add(colVar); 153 } 154 } 155 else { 156 if (entry.getKey().equals(OozieClient.FILTER_USER)) { 157 List<String> values = filter.get(OozieClient.FILTER_USER); 158 colName = "user"; 159 for (int i = 0; i < values.size(); i++) { 160 colVar = "user"; 161 colVar = colVar + index; 162 if (!isEnabled && !isUser) { 163 sb.append(seletStr).append(" where w.user IN (:user" + index); 164 isUser = true; 165 isEnabled = true; 166 } 167 else { 168 if (isEnabled && !isUser) { 169 sb.append(" and w.user IN (:user" + index); 170 isUser = true; 171 } 172 else { 173 if (isUser) { 174 sb.append(", :user" + index); 175 } 176 } 177 } 178 if (i == values.size() - 1) { 179 sb.append(")"); 180 } 181 index++; 182 valArray.add(values.get(i)); 183 orArray.add(colName); 184 colArray.add(colVar); 185 } 186 } 187 } 188 if (entry.getKey().equals(OozieClient.FILTER_ID)) { 189 List<String> values = filter.get(OozieClient.FILTER_ID); 190 colName = "id"; 191 for (int i = 0; i < values.size(); i++) { 192 colVar = "id"; 193 colVar = colVar + index; 194 if (!isEnabled && !isId) { 195 sb.append(seletStr).append(" where w.id IN (:id" + index); 196 isId = true; 197 isEnabled = true; 198 } 199 else { 200 if (isEnabled && !isId) { 201 sb.append(" and w.id IN (:id" + index); 202 isId = true; 203 } 204 else { 205 if (isId) { 206 sb.append(", :id" + index); 207 } 208 } 209 } 210 if (i == values.size() - 1) { 211 sb.append(")"); 212 } 213 index++; 214 valArray.add(values.get(i)); 215 orArray.add(colName); 216 colArray.add(colVar); 217 } 218 } 219 else if (entry.getKey().equalsIgnoreCase(OozieClient.FILTER_CREATED_TIME_START)) { 220 List<String> values = filter.get(OozieClient.FILTER_CREATED_TIME_START); 221 colName = "createdTimestampStart"; 222 if (values.size() > 1) { 223 throw new JPAExecutorException(ErrorCode.E0302, 224 "cannot specify multiple startcreatedtime"); 225 } 226 colVar = colName; 227 colVar = colVar + index; 228 if (!isEnabled) { 229 sb.append(seletStr).append(" where w.createdTimestamp >= :" + colVar); 230 isEnabled = true; 231 } 232 else { 233 sb.append(" and w.createdTimestamp >= :" + colVar); 234 } 235 index++; 236 Date createdTime = null; 237 try { 238 createdTime = parseCreatedTimeString(values.get(0)); 239 } 240 catch (Exception e) { 241 throw new JPAExecutorException(ErrorCode.E0302, e.getMessage()); 242 } 243 Timestamp createdTimeStamp = new Timestamp(createdTime.getTime()); 244 valArray.add(createdTimeStamp); 245 orArray.add(colName); 246 colArray.add(colVar); 247 248 } 249 else if (entry.getKey().equalsIgnoreCase(OozieClient.FILTER_CREATED_TIME_END)) { 250 List<String> values = filter.get(OozieClient.FILTER_CREATED_TIME_END); 251 colName = "createdTimestampEnd"; 252 if (values.size() > 1) { 253 throw new JPAExecutorException(ErrorCode.E0302, 254 "cannot specify multiple endcreatedtime"); 255 } 256 colVar = colName; 257 colVar = colVar + index; 258 if (!isEnabled) { 259 sb.append(seletStr).append(" where w.createdTimestamp <= :" + colVar); 260 isEnabled = true; 261 } 262 else { 263 sb.append(" and w.createdTimestamp <= :" + colVar); 264 } 265 index++; 266 Date createdTime = null; 267 try { 268 createdTime = parseCreatedTimeString(values.get(0)); 269 } 270 catch (Exception e) { 271 throw new JPAExecutorException(ErrorCode.E0302, e.getMessage()); 272 } 273 Timestamp createdTimeStamp = new Timestamp(createdTime.getTime()); 274 valArray.add(createdTimeStamp); 275 orArray.add(colName); 276 colArray.add(colVar); 277 } 278 // w.id = text || w.appName.contains(text) || w.user.contains(text) 279 else if (entry.getKey().equalsIgnoreCase(OozieClient.FILTER_TEXT)) { 280 StoreStatusFilter.filterJobsUsingText(filter, sb, isEnabled, seletStr, valArray, orArray, colArray); 281 isEnabled = true; 282 } 283 } 284 } 285 } 286 287 orderBy = StoreStatusFilter.getSortBy(filter, orderBy); 288 int realLen = 0; 289 290 Query q = null; 291 Query qTotal = null; 292 if (orArray.size() == 0 && orderBy.equals(DEFAULT_ORDER_BY)) { 293 q = em.createNamedQuery("GET_WORKFLOWS_COLUMNS"); 294 q.setFirstResult(start - 1); 295 q.setMaxResults(len); 296 qTotal = em.createNamedQuery("GET_WORKFLOWS_COUNT"); 297 } 298 else { 299 sb = sb.toString().trim().length() == 0 ? sb.append(seletStr) : sb; 300 String sbTotal = sb.toString(); 301 sb.append(orderBy); 302 q = em.createQuery(sb.toString()); 303 q.setFirstResult(start - 1); 304 q.setMaxResults(len); 305 qTotal = em.createQuery(sbTotal.replace(seletStr, countStr)); 306 307 for (int i = 0; i < orArray.size(); i++) { 308 q.setParameter(colArray.get(i), valArray.get(i)); 309 qTotal.setParameter(colArray.get(i), valArray.get(i)); 310 } 311 } 312 313 OpenJPAQuery kq = OpenJPAPersistence.cast(q); 314 JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan(); 315 fetch.setFetchBatchSize(20); 316 fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE); 317 fetch.setFetchDirection(FetchDirection.FORWARD); 318 fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST); 319 List<?> resultList = q.getResultList(); 320 List<Object[]> objectArrList = (List<Object[]>) resultList; 321 List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>(); 322 323 for (Object[] arr : objectArrList) { 324 WorkflowJobBean ww = getBeanForWorkflowFromArray(arr); 325 wfBeansList.add(ww); 326 } 327 328 realLen = ((Long) qTotal.getSingleResult()).intValue(); 329 330 return new WorkflowsInfo(wfBeansList, start, len, realLen); 331 } 332 333 /* (non-Javadoc) 334 * @see org.apache.oozie.executor.jpa.JPAExecutor#getName() 335 */ 336 @Override 337 public String getName() { 338 return "WorkflowsJobGetJPAExecutor"; 339 } 340 341 private Date parseCreatedTimeString(String time) throws Exception{ 342 Date createdTime = null; 343 int offset = 0; 344 if (Character.isLetter(time.charAt(time.length() - 1))) { 345 switch (time.charAt(time.length() - 1)) { 346 case 'd': 347 offset = Integer.parseInt(time.substring(0, time.length() - 1)); 348 if(offset > 0) { 349 throw new IllegalArgumentException("offset must be minus from currentTime."); 350 } 351 createdTime = org.apache.commons.lang.time.DateUtils.addDays(new Date(), offset); 352 break; 353 case 'h': 354 offset = Integer.parseInt(time.substring(0, time.length() - 1)); 355 if(offset > 0) { 356 throw new IllegalArgumentException("offset must be minus from currentTime."); 357 } 358 createdTime = org.apache.commons.lang.time.DateUtils.addHours(new Date(), offset); 359 break; 360 case 'm': 361 offset = Integer.parseInt(time.substring(0, time.length() - 1)); 362 if(offset > 0) { 363 throw new IllegalArgumentException("offset must be minus from currentTime."); 364 } 365 createdTime = org.apache.commons.lang.time.DateUtils.addMinutes(new Date(), offset); 366 break; 367 case 'Z': 368 createdTime = DateUtils.parseDateUTC(time); 369 break; 370 default: 371 throw new IllegalArgumentException("Unsupported time format: " + time + StoreStatusFilter.TIME_FORMAT); 372 } 373 } else { 374 throw new IllegalArgumentException("The format of time is wrong: " + time + StoreStatusFilter.TIME_FORMAT); 375 } 376 return createdTime; 377 } 378 379 private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) { 380 381 WorkflowJobBean wfBean = new WorkflowJobBean(); 382 wfBean.setId((String) arr[0]); 383 if (arr[1] != null) { 384 wfBean.setAppName((String) arr[1]); 385 } 386 if (arr[2] != null) { 387 wfBean.setStatus(Status.valueOf((String) arr[2])); 388 } 389 if (arr[3] != null) { 390 wfBean.setRun((Integer) arr[3]); 391 } 392 if (arr[4] != null) { 393 wfBean.setUser((String) arr[4]); 394 } 395 if (arr[5] != null) { 396 wfBean.setGroup((String) arr[5]); 397 } 398 if (arr[6] != null) { 399 wfBean.setCreatedTime((Timestamp) arr[6]); 400 } 401 if (arr[7] != null) { 402 wfBean.setStartTime((Timestamp) arr[7]); 403 } 404 if (arr[8] != null) { 405 wfBean.setLastModifiedTime((Timestamp) arr[8]); 406 } 407 if (arr[9] != null) { 408 wfBean.setEndTime((Timestamp) arr[9]); 409 } 410 if (arr[10] != null) { 411 wfBean.setExternalId((String) arr[10]); 412 } 413 if (arr[11] != null) { 414 wfBean.setParentId((String) arr[11]); 415 } 416 return wfBean; 417 } 418}