This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.executor.jpa;
019
020 import java.sql.Timestamp;
021 import java.util.ArrayList;
022 import java.util.List;
023 import java.util.Map;
024
025 import javax.persistence.EntityManager;
026 import javax.persistence.Query;
027
028 import org.apache.oozie.WorkflowJobBean;
029 import org.apache.oozie.WorkflowsInfo;
030 import org.apache.oozie.client.OozieClient;
031 import org.apache.oozie.client.WorkflowJob.Status;
032 import org.apache.openjpa.persistence.OpenJPAPersistence;
033 import org.apache.openjpa.persistence.OpenJPAQuery;
034 import org.apache.openjpa.persistence.jdbc.FetchDirection;
035 import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
036 import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
037 import org.apache.openjpa.persistence.jdbc.ResultSetType;
038
039 public class WorkflowsJobGetJPAExecutor implements JPAExecutor<WorkflowsInfo> {
040
041 private static final String seletStr = "Select w.id, w.appName, w.status, w.run, w.user, w.group, w.createdTimestamp, "
042 + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp from WorkflowJobBean w";
043 private static final String countStr = "Select count(w) from WorkflowJobBean w";
044
045 private final Map<String, List<String>> filter;
046 private final int start;
047 private final int len;
048
049 /**
050 * This JPA Executor gets the workflows info for the range.
051 *
052 * @param filter
053 * @param start
054 * @param len
055 */
056 public WorkflowsJobGetJPAExecutor(Map<String, List<String>> filter, int start, int len) {
057 this.filter = filter;
058 this.start = start;
059 this.len = len;
060 }
061
062 /* (non-Javadoc)
063 * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
064 */
065 @SuppressWarnings("unchecked")
066 @Override
067 public WorkflowsInfo execute(EntityManager em) throws JPAExecutorException {
068 List<String> orArray = new ArrayList<String>();
069 List<String> colArray = new ArrayList<String>();
070 List<String> valArray = new ArrayList<String>();
071 StringBuilder sb = new StringBuilder("");
072 boolean isStatus = false;
073 boolean isGroup = false;
074 boolean isAppName = false;
075 boolean isUser = false;
076 boolean isEnabled = false;
077 boolean isId = false;
078 int index = 0;
079 for (Map.Entry<String, List<String>> entry : filter.entrySet()) {
080 String colName = null;
081 String colVar = null;
082 if (entry.getKey().equals(OozieClient.FILTER_GROUP)) {
083 List<String> values = filter.get(OozieClient.FILTER_GROUP);
084 colName = "group";
085 for (int i = 0; i < values.size(); i++) {
086 colVar = "group";
087 colVar = colVar + index;
088 if (!isEnabled && !isGroup) {
089 sb.append(seletStr).append(" where w.group IN (:group" + index);
090 isGroup = true;
091 isEnabled = true;
092 }
093 else {
094 if (isEnabled && !isGroup) {
095 sb.append(" and w.group IN (:group" + index);
096 isGroup = true;
097 }
098 else {
099 if (isGroup) {
100 sb.append(", :group" + index);
101 }
102 }
103 }
104 if (i == values.size() - 1) {
105 sb.append(")");
106 }
107 index++;
108 valArray.add(values.get(i));
109 orArray.add(colName);
110 colArray.add(colVar);
111 }
112 }
113 else {
114 if (entry.getKey().equals(OozieClient.FILTER_STATUS)) {
115 List<String> values = filter.get(OozieClient.FILTER_STATUS);
116 colName = "status";
117 for (int i = 0; i < values.size(); i++) {
118 colVar = "status";
119 colVar = colVar + index;
120 if (!isEnabled && !isStatus) {
121 sb.append(seletStr).append(" where w.status IN (:status" + index);
122 isStatus = true;
123 isEnabled = true;
124 }
125 else {
126 if (isEnabled && !isStatus) {
127 sb.append(" and w.status IN (:status" + index);
128 isStatus = true;
129 }
130 else {
131 if (isStatus) {
132 sb.append(", :status" + index);
133 }
134 }
135 }
136 if (i == values.size() - 1) {
137 sb.append(")");
138 }
139 index++;
140 valArray.add(values.get(i));
141 orArray.add(colName);
142 colArray.add(colVar);
143 }
144 }
145 else {
146 if (entry.getKey().equals(OozieClient.FILTER_NAME)) {
147 List<String> values = filter.get(OozieClient.FILTER_NAME);
148 colName = "appName";
149 for (int i = 0; i < values.size(); i++) {
150 colVar = "appName";
151 colVar = colVar + index;
152 if (!isEnabled && !isAppName) {
153 sb.append(seletStr).append(" where w.appName IN (:appName" + index);
154 isAppName = true;
155 isEnabled = true;
156 }
157 else {
158 if (isEnabled && !isAppName) {
159 sb.append(" and w.appName IN (:appName" + index);
160 isAppName = true;
161 }
162 else {
163 if (isAppName) {
164 sb.append(", :appName" + index);
165 }
166 }
167 }
168 if (i == values.size() - 1) {
169 sb.append(")");
170 }
171 index++;
172 valArray.add(values.get(i));
173 orArray.add(colName);
174 colArray.add(colVar);
175 }
176 }
177 else {
178 if (entry.getKey().equals(OozieClient.FILTER_USER)) {
179 List<String> values = filter.get(OozieClient.FILTER_USER);
180 colName = "user";
181 for (int i = 0; i < values.size(); i++) {
182 colVar = "user";
183 colVar = colVar + index;
184 if (!isEnabled && !isUser) {
185 sb.append(seletStr).append(" where w.user IN (:user" + index);
186 isUser = true;
187 isEnabled = true;
188 }
189 else {
190 if (isEnabled && !isUser) {
191 sb.append(" and w.user IN (:user" + index);
192 isUser = true;
193 }
194 else {
195 if (isUser) {
196 sb.append(", :user" + index);
197 }
198 }
199 }
200 if (i == values.size() - 1) {
201 sb.append(")");
202 }
203 index++;
204 valArray.add(values.get(i));
205 orArray.add(colName);
206 colArray.add(colVar);
207 }
208 }
209 }
210 if (entry.getKey().equals(OozieClient.FILTER_ID)) {
211 List<String> values = filter.get(OozieClient.FILTER_ID);
212 colName = "id";
213 for (int i = 0; i < values.size(); i++) {
214 colVar = "id";
215 colVar = colVar + index;
216 if (!isEnabled && !isId) {
217 sb.append(seletStr).append(" where w.id IN (:id" + index);
218 isId = true;
219 isEnabled = true;
220 }
221 else {
222 if (isEnabled && !isId) {
223 sb.append(" and w.id IN (:id" + index);
224 isId = true;
225 }
226 else {
227 if (isId) {
228 sb.append(", :id" + index);
229 }
230 }
231 }
232 if (i == values.size() - 1) {
233 sb.append(")");
234 }
235 index++;
236 valArray.add(values.get(i));
237 orArray.add(colName);
238 colArray.add(colVar);
239 }
240 }
241 }
242 }
243 }
244
245 int realLen = 0;
246
247 Query q = null;
248 Query qTotal = null;
249 if (orArray.size() == 0) {
250 q = em.createNamedQuery("GET_WORKFLOWS_COLUMNS");
251 q.setFirstResult(start - 1);
252 q.setMaxResults(len);
253 qTotal = em.createNamedQuery("GET_WORKFLOWS_COUNT");
254 }
255 else {
256 if (orArray.size() > 0) {
257 StringBuilder sbTotal = new StringBuilder(sb);
258 sb.append(" order by w.startTimestamp desc ");
259 q = em.createQuery(sb.toString());
260 q.setFirstResult(start - 1);
261 q.setMaxResults(len);
262 qTotal = em.createQuery(sbTotal.toString().replace(seletStr, countStr));
263 for (int i = 0; i < orArray.size(); i++) {
264 q.setParameter(colArray.get(i), valArray.get(i));
265 qTotal.setParameter(colArray.get(i), valArray.get(i));
266 }
267 }
268 }
269
270 OpenJPAQuery kq = OpenJPAPersistence.cast(q);
271 JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
272 fetch.setFetchBatchSize(20);
273 fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
274 fetch.setFetchDirection(FetchDirection.FORWARD);
275 fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
276 List<?> resultList = q.getResultList();
277 List<Object[]> objectArrList = (List<Object[]>) resultList;
278 List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>();
279
280 for (Object[] arr : objectArrList) {
281 WorkflowJobBean ww = getBeanForWorkflowFromArray(arr);
282 wfBeansList.add(ww);
283 }
284
285 realLen = ((Long) qTotal.getSingleResult()).intValue();
286
287 return new WorkflowsInfo(wfBeansList, start, len, realLen);
288 }
289
290 /* (non-Javadoc)
291 * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
292 */
293 @Override
294 public String getName() {
295 return "WorkflowsJobGetJPAExecutor";
296 }
297
298 private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) {
299
300 WorkflowJobBean wfBean = new WorkflowJobBean();
301 wfBean.setId((String) arr[0]);
302 if (arr[1] != null) {
303 wfBean.setAppName((String) arr[1]);
304 }
305 if (arr[2] != null) {
306 wfBean.setStatus(Status.valueOf((String) arr[2]));
307 }
308 if (arr[3] != null) {
309 wfBean.setRun((Integer) arr[3]);
310 }
311 if (arr[4] != null) {
312 wfBean.setUser((String) arr[4]);
313 }
314 if (arr[5] != null) {
315 wfBean.setGroup((String) arr[5]);
316 }
317 if (arr[6] != null) {
318 wfBean.setCreatedTime((Timestamp) arr[6]);
319 }
320 if (arr[7] != null) {
321 wfBean.setStartTime((Timestamp) arr[7]);
322 }
323 if (arr[8] != null) {
324 wfBean.setLastModifiedTime((Timestamp) arr[8]);
325 }
326 if (arr[9] != null) {
327 wfBean.setEndTime((Timestamp) arr[9]);
328 }
329 return wfBean;
330 }
331 }