This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.executor.jpa;
019
020 import java.sql.Timestamp;
021 import java.util.ArrayList;
022 import java.util.List;
023 import java.util.Map;
024
025 import javax.persistence.EntityManager;
026 import javax.persistence.Query;
027
028 import org.apache.oozie.WorkflowJobBean;
029 import org.apache.oozie.WorkflowsInfo;
030 import org.apache.oozie.client.OozieClient;
031 import org.apache.oozie.client.WorkflowJob.Status;
032 import org.apache.oozie.util.XLog;
033 import org.apache.openjpa.persistence.OpenJPAPersistence;
034 import org.apache.openjpa.persistence.OpenJPAQuery;
035 import org.apache.openjpa.persistence.jdbc.FetchDirection;
036 import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
037 import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
038 import org.apache.openjpa.persistence.jdbc.ResultSetType;
039
040 public class WorkflowsJobGetJPAExecutor implements JPAExecutor<WorkflowsInfo> {
041
042 private static final String seletStr = "Select w.id, w.appName, w.status, w.run, w.user, w.group, w.createdTimestamp, "
043 + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp, w.externalId from WorkflowJobBean w";
044 private static final String countStr = "Select count(w) from WorkflowJobBean w";
045
046 private final Map<String, List<String>> filter;
047 private final int start;
048 private final int len;
049
050 /**
051 * This JPA Executor gets the workflows info for the range.
052 *
053 * @param filter
054 * @param start
055 * @param len
056 */
057 public WorkflowsJobGetJPAExecutor(Map<String, List<String>> filter, int start, int len) {
058 this.filter = filter;
059 this.start = start;
060 this.len = len;
061 }
062
063 /* (non-Javadoc)
064 * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
065 */
066 @SuppressWarnings("unchecked")
067 @Override
068 public WorkflowsInfo execute(EntityManager em) throws JPAExecutorException {
069 List<String> orArray = new ArrayList<String>();
070 List<String> colArray = new ArrayList<String>();
071 List<String> valArray = new ArrayList<String>();
072 StringBuilder sb = new StringBuilder("");
073 boolean isStatus = false;
074 boolean isAppName = false;
075 boolean isUser = false;
076 boolean isEnabled = false;
077 boolean isId = false;
078 int index = 0;
079 for (Map.Entry<String, List<String>> entry : filter.entrySet()) {
080 String colName = null;
081 String colVar = null;
082 if (entry.getKey().equals(OozieClient.FILTER_GROUP)) {
083 XLog.getLog(getClass()).warn("Filter by 'group' is not supported anymore");
084 } else {
085 if (entry.getKey().equals(OozieClient.FILTER_STATUS)) {
086 List<String> values = filter.get(OozieClient.FILTER_STATUS);
087 colName = "status";
088 for (int i = 0; i < values.size(); i++) {
089 colVar = "status";
090 colVar = colVar + index;
091 if (!isEnabled && !isStatus) {
092 sb.append(seletStr).append(" where w.status IN (:status" + index);
093 isStatus = true;
094 isEnabled = true;
095 }
096 else {
097 if (isEnabled && !isStatus) {
098 sb.append(" and w.status IN (:status" + index);
099 isStatus = true;
100 }
101 else {
102 if (isStatus) {
103 sb.append(", :status" + index);
104 }
105 }
106 }
107 if (i == values.size() - 1) {
108 sb.append(")");
109 }
110 index++;
111 valArray.add(values.get(i));
112 orArray.add(colName);
113 colArray.add(colVar);
114 }
115 }
116 else {
117 if (entry.getKey().equals(OozieClient.FILTER_NAME)) {
118 List<String> values = filter.get(OozieClient.FILTER_NAME);
119 colName = "appName";
120 for (int i = 0; i < values.size(); i++) {
121 colVar = "appName";
122 colVar = colVar + index;
123 if (!isEnabled && !isAppName) {
124 sb.append(seletStr).append(" where w.appName IN (:appName" + index);
125 isAppName = true;
126 isEnabled = true;
127 }
128 else {
129 if (isEnabled && !isAppName) {
130 sb.append(" and w.appName IN (:appName" + index);
131 isAppName = true;
132 }
133 else {
134 if (isAppName) {
135 sb.append(", :appName" + index);
136 }
137 }
138 }
139 if (i == values.size() - 1) {
140 sb.append(")");
141 }
142 index++;
143 valArray.add(values.get(i));
144 orArray.add(colName);
145 colArray.add(colVar);
146 }
147 }
148 else {
149 if (entry.getKey().equals(OozieClient.FILTER_USER)) {
150 List<String> values = filter.get(OozieClient.FILTER_USER);
151 colName = "user";
152 for (int i = 0; i < values.size(); i++) {
153 colVar = "user";
154 colVar = colVar + index;
155 if (!isEnabled && !isUser) {
156 sb.append(seletStr).append(" where w.user IN (:user" + index);
157 isUser = true;
158 isEnabled = true;
159 }
160 else {
161 if (isEnabled && !isUser) {
162 sb.append(" and w.user IN (:user" + index);
163 isUser = true;
164 }
165 else {
166 if (isUser) {
167 sb.append(", :user" + index);
168 }
169 }
170 }
171 if (i == values.size() - 1) {
172 sb.append(")");
173 }
174 index++;
175 valArray.add(values.get(i));
176 orArray.add(colName);
177 colArray.add(colVar);
178 }
179 }
180 }
181 if (entry.getKey().equals(OozieClient.FILTER_ID)) {
182 List<String> values = filter.get(OozieClient.FILTER_ID);
183 colName = "id";
184 for (int i = 0; i < values.size(); i++) {
185 colVar = "id";
186 colVar = colVar + index;
187 if (!isEnabled && !isId) {
188 sb.append(seletStr).append(" where w.id IN (:id" + index);
189 isId = true;
190 isEnabled = true;
191 }
192 else {
193 if (isEnabled && !isId) {
194 sb.append(" and w.id IN (:id" + index);
195 isId = true;
196 }
197 else {
198 if (isId) {
199 sb.append(", :id" + index);
200 }
201 }
202 }
203 if (i == values.size() - 1) {
204 sb.append(")");
205 }
206 index++;
207 valArray.add(values.get(i));
208 orArray.add(colName);
209 colArray.add(colVar);
210 }
211 }
212 }
213 }
214 }
215
216 int realLen = 0;
217
218 Query q = null;
219 Query qTotal = null;
220 if (orArray.size() == 0) {
221 q = em.createNamedQuery("GET_WORKFLOWS_COLUMNS");
222 q.setFirstResult(start - 1);
223 q.setMaxResults(len);
224 qTotal = em.createNamedQuery("GET_WORKFLOWS_COUNT");
225 }
226 else {
227 if (orArray.size() > 0) {
228 StringBuilder sbTotal = new StringBuilder(sb);
229 sb.append(" order by w.startTimestamp desc ");
230 q = em.createQuery(sb.toString());
231 q.setFirstResult(start - 1);
232 q.setMaxResults(len);
233 qTotal = em.createQuery(sbTotal.toString().replace(seletStr, countStr));
234 for (int i = 0; i < orArray.size(); i++) {
235 q.setParameter(colArray.get(i), valArray.get(i));
236 qTotal.setParameter(colArray.get(i), valArray.get(i));
237 }
238 }
239 }
240
241 OpenJPAQuery kq = OpenJPAPersistence.cast(q);
242 JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
243 fetch.setFetchBatchSize(20);
244 fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
245 fetch.setFetchDirection(FetchDirection.FORWARD);
246 fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
247 List<?> resultList = q.getResultList();
248 List<Object[]> objectArrList = (List<Object[]>) resultList;
249 List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>();
250
251 for (Object[] arr : objectArrList) {
252 WorkflowJobBean ww = getBeanForWorkflowFromArray(arr);
253 wfBeansList.add(ww);
254 }
255
256 realLen = ((Long) qTotal.getSingleResult()).intValue();
257
258 return new WorkflowsInfo(wfBeansList, start, len, realLen);
259 }
260
261 /* (non-Javadoc)
262 * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
263 */
264 @Override
265 public String getName() {
266 return "WorkflowsJobGetJPAExecutor";
267 }
268
269 private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) {
270
271 WorkflowJobBean wfBean = new WorkflowJobBean();
272 wfBean.setId((String) arr[0]);
273 if (arr[1] != null) {
274 wfBean.setAppName((String) arr[1]);
275 }
276 if (arr[2] != null) {
277 wfBean.setStatus(Status.valueOf((String) arr[2]));
278 }
279 if (arr[3] != null) {
280 wfBean.setRun((Integer) arr[3]);
281 }
282 if (arr[4] != null) {
283 wfBean.setUser((String) arr[4]);
284 }
285 if (arr[5] != null) {
286 wfBean.setGroup((String) arr[5]);
287 }
288 if (arr[6] != null) {
289 wfBean.setCreatedTime((Timestamp) arr[6]);
290 }
291 if (arr[7] != null) {
292 wfBean.setStartTime((Timestamp) arr[7]);
293 }
294 if (arr[8] != null) {
295 wfBean.setLastModifiedTime((Timestamp) arr[8]);
296 }
297 if (arr[9] != null) {
298 wfBean.setEndTime((Timestamp) arr[9]);
299 }
300 if (arr[10] != null) {
301 wfBean.setExternalId((String) arr[10]);
302 }
303 return wfBean;
304 }
305 }