001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.executor.jpa;
019    
020    import java.sql.Timestamp;
021    import java.util.ArrayList;
022    import java.util.List;
023    import java.util.Map;
024    
025    import javax.persistence.EntityManager;
026    import javax.persistence.Query;
027    
028    import org.apache.oozie.WorkflowJobBean;
029    import org.apache.oozie.WorkflowsInfo;
030    import org.apache.oozie.client.OozieClient;
031    import org.apache.oozie.client.WorkflowJob.Status;
032    import org.apache.oozie.util.XLog;
033    import org.apache.openjpa.persistence.OpenJPAPersistence;
034    import org.apache.openjpa.persistence.OpenJPAQuery;
035    import org.apache.openjpa.persistence.jdbc.FetchDirection;
036    import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
037    import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
038    import org.apache.openjpa.persistence.jdbc.ResultSetType;
039    
040    public class WorkflowsJobGetJPAExecutor implements JPAExecutor<WorkflowsInfo> {
041    
042        private static final String seletStr = "Select w.id, w.appName, w.status, w.run, w.user, w.group, w.createdTimestamp, "
043            + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp, w.externalId from WorkflowJobBean w";
044        private static final String countStr = "Select count(w) from WorkflowJobBean w";
045    
046        private final Map<String, List<String>> filter;
047        private final int start;
048        private final int len;
049    
050        /**
051         * This JPA Executor gets the workflows info for the range.
052         *
053         * @param filter
054         * @param start
055         * @param len
056         */
057        public WorkflowsJobGetJPAExecutor(Map<String, List<String>> filter, int start, int len) {
058            this.filter = filter;
059            this.start = start;
060            this.len = len;
061        }
062    
063        /* (non-Javadoc)
064         * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
065         */
066        @SuppressWarnings("unchecked")
067        @Override
068        public WorkflowsInfo execute(EntityManager em) throws JPAExecutorException {
069            List<String> orArray = new ArrayList<String>();
070            List<String> colArray = new ArrayList<String>();
071            List<String> valArray = new ArrayList<String>();
072            StringBuilder sb = new StringBuilder("");
073            boolean isStatus = false;
074            boolean isAppName = false;
075            boolean isUser = false;
076            boolean isEnabled = false;
077            boolean isId = false;
078            int index = 0;
079            for (Map.Entry<String, List<String>> entry : filter.entrySet()) {
080                String colName = null;
081                String colVar = null;
082                if (entry.getKey().equals(OozieClient.FILTER_GROUP)) {
083                    XLog.getLog(getClass()).warn("Filter by 'group' is not supported anymore");
084                } else {
085                    if (entry.getKey().equals(OozieClient.FILTER_STATUS)) {
086                        List<String> values = filter.get(OozieClient.FILTER_STATUS);
087                        colName = "status";
088                        for (int i = 0; i < values.size(); i++) {
089                            colVar = "status";
090                            colVar = colVar + index;
091                            if (!isEnabled && !isStatus) {
092                                sb.append(seletStr).append(" where w.status IN (:status" + index);
093                                isStatus = true;
094                                isEnabled = true;
095                            }
096                            else {
097                                if (isEnabled && !isStatus) {
098                                    sb.append(" and w.status IN (:status" + index);
099                                    isStatus = true;
100                                }
101                                else {
102                                    if (isStatus) {
103                                        sb.append(", :status" + index);
104                                    }
105                                }
106                            }
107                            if (i == values.size() - 1) {
108                                sb.append(")");
109                            }
110                            index++;
111                            valArray.add(values.get(i));
112                            orArray.add(colName);
113                            colArray.add(colVar);
114                        }
115                    }
116                    else {
117                        if (entry.getKey().equals(OozieClient.FILTER_NAME)) {
118                            List<String> values = filter.get(OozieClient.FILTER_NAME);
119                            colName = "appName";
120                            for (int i = 0; i < values.size(); i++) {
121                                colVar = "appName";
122                                colVar = colVar + index;
123                                if (!isEnabled && !isAppName) {
124                                    sb.append(seletStr).append(" where w.appName IN (:appName" + index);
125                                    isAppName = true;
126                                    isEnabled = true;
127                                }
128                                else {
129                                    if (isEnabled && !isAppName) {
130                                        sb.append(" and w.appName IN (:appName" + index);
131                                        isAppName = true;
132                                    }
133                                    else {
134                                        if (isAppName) {
135                                            sb.append(", :appName" + index);
136                                        }
137                                    }
138                                }
139                                if (i == values.size() - 1) {
140                                    sb.append(")");
141                                }
142                                index++;
143                                valArray.add(values.get(i));
144                                orArray.add(colName);
145                                colArray.add(colVar);
146                            }
147                        }
148                        else {
149                            if (entry.getKey().equals(OozieClient.FILTER_USER)) {
150                                List<String> values = filter.get(OozieClient.FILTER_USER);
151                                colName = "user";
152                                for (int i = 0; i < values.size(); i++) {
153                                    colVar = "user";
154                                    colVar = colVar + index;
155                                    if (!isEnabled && !isUser) {
156                                        sb.append(seletStr).append(" where w.user IN (:user" + index);
157                                        isUser = true;
158                                        isEnabled = true;
159                                    }
160                                    else {
161                                        if (isEnabled && !isUser) {
162                                            sb.append(" and w.user IN (:user" + index);
163                                            isUser = true;
164                                        }
165                                        else {
166                                            if (isUser) {
167                                                sb.append(", :user" + index);
168                                            }
169                                        }
170                                    }
171                                    if (i == values.size() - 1) {
172                                        sb.append(")");
173                                    }
174                                    index++;
175                                    valArray.add(values.get(i));
176                                    orArray.add(colName);
177                                    colArray.add(colVar);
178                                }
179                            }
180                        }
181                        if (entry.getKey().equals(OozieClient.FILTER_ID)) {
182                            List<String> values = filter.get(OozieClient.FILTER_ID);
183                            colName = "id";
184                            for (int i = 0; i < values.size(); i++) {
185                                colVar = "id";
186                                colVar = colVar + index;
187                                if (!isEnabled && !isId) {
188                                    sb.append(seletStr).append(" where w.id IN (:id" + index);
189                                    isId = true;
190                                    isEnabled = true;
191                                }
192                                else {
193                                    if (isEnabled && !isId) {
194                                        sb.append(" and w.id IN (:id" + index);
195                                        isId = true;
196                                    }
197                                    else {
198                                        if (isId) {
199                                            sb.append(", :id" + index);
200                                        }
201                                    }
202                                }
203                                if (i == values.size() - 1) {
204                                    sb.append(")");
205                                }
206                                index++;
207                                valArray.add(values.get(i));
208                                orArray.add(colName);
209                                colArray.add(colVar);
210                            }
211                        }
212                    }
213                }
214            }
215    
216            int realLen = 0;
217    
218            Query q = null;
219            Query qTotal = null;
220            if (orArray.size() == 0) {
221                q = em.createNamedQuery("GET_WORKFLOWS_COLUMNS");
222                q.setFirstResult(start - 1);
223                q.setMaxResults(len);
224                qTotal = em.createNamedQuery("GET_WORKFLOWS_COUNT");
225            }
226            else {
227                if (orArray.size() > 0) {
228                    StringBuilder sbTotal = new StringBuilder(sb);
229                    sb.append(" order by w.startTimestamp desc ");
230                    q = em.createQuery(sb.toString());
231                    q.setFirstResult(start - 1);
232                    q.setMaxResults(len);
233                    qTotal = em.createQuery(sbTotal.toString().replace(seletStr, countStr));
234                    for (int i = 0; i < orArray.size(); i++) {
235                        q.setParameter(colArray.get(i), valArray.get(i));
236                        qTotal.setParameter(colArray.get(i), valArray.get(i));
237                    }
238                }
239            }
240    
241            OpenJPAQuery kq = OpenJPAPersistence.cast(q);
242            JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
243            fetch.setFetchBatchSize(20);
244            fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
245            fetch.setFetchDirection(FetchDirection.FORWARD);
246            fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
247            List<?> resultList = q.getResultList();
248            List<Object[]> objectArrList = (List<Object[]>) resultList;
249            List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>();
250    
251            for (Object[] arr : objectArrList) {
252                WorkflowJobBean ww = getBeanForWorkflowFromArray(arr);
253                wfBeansList.add(ww);
254            }
255    
256            realLen = ((Long) qTotal.getSingleResult()).intValue();
257    
258            return new WorkflowsInfo(wfBeansList, start, len, realLen);
259        }
260    
261        /* (non-Javadoc)
262         * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
263         */
264        @Override
265        public String getName() {
266            return "WorkflowsJobGetJPAExecutor";
267        }
268    
269        private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) {
270    
271            WorkflowJobBean wfBean = new WorkflowJobBean();
272            wfBean.setId((String) arr[0]);
273            if (arr[1] != null) {
274                wfBean.setAppName((String) arr[1]);
275            }
276            if (arr[2] != null) {
277                wfBean.setStatus(Status.valueOf((String) arr[2]));
278            }
279            if (arr[3] != null) {
280                wfBean.setRun((Integer) arr[3]);
281            }
282            if (arr[4] != null) {
283                wfBean.setUser((String) arr[4]);
284            }
285            if (arr[5] != null) {
286                wfBean.setGroup((String) arr[5]);
287            }
288            if (arr[6] != null) {
289                wfBean.setCreatedTime((Timestamp) arr[6]);
290            }
291            if (arr[7] != null) {
292                wfBean.setStartTime((Timestamp) arr[7]);
293            }
294            if (arr[8] != null) {
295                wfBean.setLastModifiedTime((Timestamp) arr[8]);
296            }
297            if (arr[9] != null) {
298                wfBean.setEndTime((Timestamp) arr[9]);
299            }
300            if (arr[10] != null) {
301                wfBean.setExternalId((String) arr[10]);
302            }
303            return wfBean;
304        }
305    }