001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.executor.jpa;
019    
020    import java.sql.Timestamp;
021    import java.util.ArrayList;
022    import java.util.List;
023    import java.util.Map;
024    
025    import javax.persistence.EntityManager;
026    import javax.persistence.Query;
027    
028    import org.apache.oozie.WorkflowJobBean;
029    import org.apache.oozie.WorkflowsInfo;
030    import org.apache.oozie.client.OozieClient;
031    import org.apache.oozie.client.WorkflowJob.Status;
032    import org.apache.openjpa.persistence.OpenJPAPersistence;
033    import org.apache.openjpa.persistence.OpenJPAQuery;
034    import org.apache.openjpa.persistence.jdbc.FetchDirection;
035    import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
036    import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
037    import org.apache.openjpa.persistence.jdbc.ResultSetType;
038    
039    public class WorkflowsJobGetJPAExecutor implements JPAExecutor<WorkflowsInfo> {
040    
041        private static final String seletStr = "Select w.id, w.appName, w.status, w.run, w.user, w.group, w.createdTimestamp, "
042            + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp from WorkflowJobBean w";
043        private static final String countStr = "Select count(w) from WorkflowJobBean w";
044    
045        private final Map<String, List<String>> filter;
046        private final int start;
047        private final int len;
048    
049        /**
050         * This JPA Executor gets the workflows info for the range.
051         *
052         * @param filter
053         * @param start
054         * @param len
055         */
056        public WorkflowsJobGetJPAExecutor(Map<String, List<String>> filter, int start, int len) {
057            this.filter = filter;
058            this.start = start;
059            this.len = len;
060        }
061    
062        /* (non-Javadoc)
063         * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
064         */
065        @SuppressWarnings("unchecked")
066        @Override
067        public WorkflowsInfo execute(EntityManager em) throws JPAExecutorException {
068            List<String> orArray = new ArrayList<String>();
069            List<String> colArray = new ArrayList<String>();
070            List<String> valArray = new ArrayList<String>();
071            StringBuilder sb = new StringBuilder("");
072            boolean isStatus = false;
073            boolean isGroup = false;
074            boolean isAppName = false;
075            boolean isUser = false;
076            boolean isEnabled = false;
077            boolean isId = false;
078            int index = 0;
079            for (Map.Entry<String, List<String>> entry : filter.entrySet()) {
080                String colName = null;
081                String colVar = null;
082                if (entry.getKey().equals(OozieClient.FILTER_GROUP)) {
083                    List<String> values = filter.get(OozieClient.FILTER_GROUP);
084                    colName = "group";
085                    for (int i = 0; i < values.size(); i++) {
086                        colVar = "group";
087                        colVar = colVar + index;
088                        if (!isEnabled && !isGroup) {
089                            sb.append(seletStr).append(" where w.group IN (:group" + index);
090                            isGroup = true;
091                            isEnabled = true;
092                        }
093                        else {
094                            if (isEnabled && !isGroup) {
095                                sb.append(" and w.group IN (:group" + index);
096                                isGroup = true;
097                            }
098                            else {
099                                if (isGroup) {
100                                    sb.append(", :group" + index);
101                                }
102                            }
103                        }
104                        if (i == values.size() - 1) {
105                            sb.append(")");
106                        }
107                        index++;
108                        valArray.add(values.get(i));
109                        orArray.add(colName);
110                        colArray.add(colVar);
111                    }
112                }
113                else {
114                    if (entry.getKey().equals(OozieClient.FILTER_STATUS)) {
115                        List<String> values = filter.get(OozieClient.FILTER_STATUS);
116                        colName = "status";
117                        for (int i = 0; i < values.size(); i++) {
118                            colVar = "status";
119                            colVar = colVar + index;
120                            if (!isEnabled && !isStatus) {
121                                sb.append(seletStr).append(" where w.status IN (:status" + index);
122                                isStatus = true;
123                                isEnabled = true;
124                            }
125                            else {
126                                if (isEnabled && !isStatus) {
127                                    sb.append(" and w.status IN (:status" + index);
128                                    isStatus = true;
129                                }
130                                else {
131                                    if (isStatus) {
132                                        sb.append(", :status" + index);
133                                    }
134                                }
135                            }
136                            if (i == values.size() - 1) {
137                                sb.append(")");
138                            }
139                            index++;
140                            valArray.add(values.get(i));
141                            orArray.add(colName);
142                            colArray.add(colVar);
143                        }
144                    }
145                    else {
146                        if (entry.getKey().equals(OozieClient.FILTER_NAME)) {
147                            List<String> values = filter.get(OozieClient.FILTER_NAME);
148                            colName = "appName";
149                            for (int i = 0; i < values.size(); i++) {
150                                colVar = "appName";
151                                colVar = colVar + index;
152                                if (!isEnabled && !isAppName) {
153                                    sb.append(seletStr).append(" where w.appName IN (:appName" + index);
154                                    isAppName = true;
155                                    isEnabled = true;
156                                }
157                                else {
158                                    if (isEnabled && !isAppName) {
159                                        sb.append(" and w.appName IN (:appName" + index);
160                                        isAppName = true;
161                                    }
162                                    else {
163                                        if (isAppName) {
164                                            sb.append(", :appName" + index);
165                                        }
166                                    }
167                                }
168                                if (i == values.size() - 1) {
169                                    sb.append(")");
170                                }
171                                index++;
172                                valArray.add(values.get(i));
173                                orArray.add(colName);
174                                colArray.add(colVar);
175                            }
176                        }
177                        else {
178                            if (entry.getKey().equals(OozieClient.FILTER_USER)) {
179                                List<String> values = filter.get(OozieClient.FILTER_USER);
180                                colName = "user";
181                                for (int i = 0; i < values.size(); i++) {
182                                    colVar = "user";
183                                    colVar = colVar + index;
184                                    if (!isEnabled && !isUser) {
185                                        sb.append(seletStr).append(" where w.user IN (:user" + index);
186                                        isUser = true;
187                                        isEnabled = true;
188                                    }
189                                    else {
190                                        if (isEnabled && !isUser) {
191                                            sb.append(" and w.user IN (:user" + index);
192                                            isUser = true;
193                                        }
194                                        else {
195                                            if (isUser) {
196                                                sb.append(", :user" + index);
197                                            }
198                                        }
199                                    }
200                                    if (i == values.size() - 1) {
201                                        sb.append(")");
202                                    }
203                                    index++;
204                                    valArray.add(values.get(i));
205                                    orArray.add(colName);
206                                    colArray.add(colVar);
207                                }
208                            }
209                        }
210                        if (entry.getKey().equals(OozieClient.FILTER_ID)) {
211                            List<String> values = filter.get(OozieClient.FILTER_ID);
212                            colName = "id";
213                            for (int i = 0; i < values.size(); i++) {
214                                colVar = "id";
215                                colVar = colVar + index;
216                                if (!isEnabled && !isId) {
217                                    sb.append(seletStr).append(" where w.id IN (:id" + index);
218                                    isId = true;
219                                    isEnabled = true;
220                                }
221                                else {
222                                    if (isEnabled && !isId) {
223                                        sb.append(" and w.id IN (:id" + index);
224                                        isId = true;
225                                    }
226                                    else {
227                                        if (isId) {
228                                            sb.append(", :id" + index);
229                                        }
230                                    }
231                                }
232                                if (i == values.size() - 1) {
233                                    sb.append(")");
234                                }
235                                index++;
236                                valArray.add(values.get(i));
237                                orArray.add(colName);
238                                colArray.add(colVar);
239                            }
240                        }
241                    }
242                }
243            }
244    
245            int realLen = 0;
246    
247            Query q = null;
248            Query qTotal = null;
249            if (orArray.size() == 0) {
250                q = em.createNamedQuery("GET_WORKFLOWS_COLUMNS");
251                q.setFirstResult(start - 1);
252                q.setMaxResults(len);
253                qTotal = em.createNamedQuery("GET_WORKFLOWS_COUNT");
254            }
255            else {
256                if (orArray.size() > 0) {
257                    StringBuilder sbTotal = new StringBuilder(sb);
258                    sb.append(" order by w.startTimestamp desc ");
259                    q = em.createQuery(sb.toString());
260                    q.setFirstResult(start - 1);
261                    q.setMaxResults(len);
262                    qTotal = em.createQuery(sbTotal.toString().replace(seletStr, countStr));
263                    for (int i = 0; i < orArray.size(); i++) {
264                        q.setParameter(colArray.get(i), valArray.get(i));
265                        qTotal.setParameter(colArray.get(i), valArray.get(i));
266                    }
267                }
268            }
269    
270            OpenJPAQuery kq = OpenJPAPersistence.cast(q);
271            JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
272            fetch.setFetchBatchSize(20);
273            fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
274            fetch.setFetchDirection(FetchDirection.FORWARD);
275            fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
276            List<?> resultList = q.getResultList();
277            List<Object[]> objectArrList = (List<Object[]>) resultList;
278            List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>();
279    
280            for (Object[] arr : objectArrList) {
281                WorkflowJobBean ww = getBeanForWorkflowFromArray(arr);
282                wfBeansList.add(ww);
283            }
284    
285            realLen = ((Long) qTotal.getSingleResult()).intValue();
286    
287            return new WorkflowsInfo(wfBeansList, start, len, realLen);
288        }
289    
290        /* (non-Javadoc)
291         * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
292         */
293        @Override
294        public String getName() {
295            return "WorkflowsJobGetJPAExecutor";
296        }
297    
298        private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) {
299    
300            WorkflowJobBean wfBean = new WorkflowJobBean();
301            wfBean.setId((String) arr[0]);
302            if (arr[1] != null) {
303                wfBean.setAppName((String) arr[1]);
304            }
305            if (arr[2] != null) {
306                wfBean.setStatus(Status.valueOf((String) arr[2]));
307            }
308            if (arr[3] != null) {
309                wfBean.setRun((Integer) arr[3]);
310            }
311            if (arr[4] != null) {
312                wfBean.setUser((String) arr[4]);
313            }
314            if (arr[5] != null) {
315                wfBean.setGroup((String) arr[5]);
316            }
317            if (arr[6] != null) {
318                wfBean.setCreatedTime((Timestamp) arr[6]);
319            }
320            if (arr[7] != null) {
321                wfBean.setStartTime((Timestamp) arr[7]);
322            }
323            if (arr[8] != null) {
324                wfBean.setLastModifiedTime((Timestamp) arr[8]);
325            }
326            if (arr[9] != null) {
327                wfBean.setEndTime((Timestamp) arr[9]);
328            }
329            return wfBean;
330        }
331    }