001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.executor.jpa;
019
020import java.sql.Timestamp;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Map;
024
025import javax.persistence.EntityManager;
026import javax.persistence.Query;
027
028import org.apache.oozie.WorkflowJobBean;
029import org.apache.oozie.WorkflowsInfo;
030import org.apache.oozie.client.OozieClient;
031import org.apache.oozie.client.WorkflowJob.Status;
032import org.apache.oozie.util.XLog;
033import org.apache.openjpa.persistence.OpenJPAPersistence;
034import org.apache.openjpa.persistence.OpenJPAQuery;
035import org.apache.openjpa.persistence.jdbc.FetchDirection;
036import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
037import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
038import org.apache.openjpa.persistence.jdbc.ResultSetType;
039
040public class WorkflowsJobGetJPAExecutor implements JPAExecutor<WorkflowsInfo> {
041
042    private static final String seletStr = "Select w.id, w.appName, w.statusStr, w.run, w.user, w.group, w.createdTimestamp, "
043            + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp, w.externalId, w.parentId from WorkflowJobBean w";
044    private static final String countStr = "Select count(w) from WorkflowJobBean w";
045
046    private final Map<String, List<String>> filter;
047    private final int start;
048    private final int len;
049
050    /**
051     * This JPA Executor gets the workflows info for the range.
052     *
053     * @param filter
054     * @param start
055     * @param len
056     */
057    public WorkflowsJobGetJPAExecutor(Map<String, List<String>> filter, int start, int len) {
058        this.filter = filter;
059        this.start = start;
060        this.len = len;
061    }
062
063    /* (non-Javadoc)
064     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
065     */
066    @SuppressWarnings("unchecked")
067    @Override
068    public WorkflowsInfo execute(EntityManager em) throws JPAExecutorException {
069        List<String> orArray = new ArrayList<String>();
070        List<String> colArray = new ArrayList<String>();
071        List<String> valArray = new ArrayList<String>();
072        StringBuilder sb = new StringBuilder("");
073        boolean isStatus = false;
074        boolean isAppName = false;
075        boolean isUser = false;
076        boolean isEnabled = false;
077        boolean isId = false;
078        int index = 0;
079        for (Map.Entry<String, List<String>> entry : filter.entrySet()) {
080            String colName = null;
081            String colVar = null;
082            if (entry.getKey().equals(OozieClient.FILTER_GROUP)) {
083                XLog.getLog(getClass()).warn("Filter by 'group' is not supported anymore");
084            } else {
085                if (entry.getKey().equals(OozieClient.FILTER_STATUS)) {
086                    List<String> values = filter.get(OozieClient.FILTER_STATUS);
087                    colName = "status";
088                    for (int i = 0; i < values.size(); i++) {
089                        colVar = "status";
090                        colVar = colVar + index;
091                        if (!isEnabled && !isStatus) {
092                            sb.append(seletStr).append(" where w.statusStr IN (:status" + index);
093                            isStatus = true;
094                            isEnabled = true;
095                        }
096                        else {
097                            if (isEnabled && !isStatus) {
098                                sb.append(" and w.statusStr IN (:status" + index);
099                                isStatus = true;
100                            }
101                            else {
102                                if (isStatus) {
103                                    sb.append(", :status" + index);
104                                }
105                            }
106                        }
107                        if (i == values.size() - 1) {
108                            sb.append(")");
109                        }
110                        index++;
111                        valArray.add(values.get(i));
112                        orArray.add(colName);
113                        colArray.add(colVar);
114                    }
115                }
116                else {
117                    if (entry.getKey().equals(OozieClient.FILTER_NAME)) {
118                        List<String> values = filter.get(OozieClient.FILTER_NAME);
119                        colName = "appName";
120                        for (int i = 0; i < values.size(); i++) {
121                            colVar = "appName";
122                            colVar = colVar + index;
123                            if (!isEnabled && !isAppName) {
124                                sb.append(seletStr).append(" where w.appName IN (:appName" + index);
125                                isAppName = true;
126                                isEnabled = true;
127                            }
128                            else {
129                                if (isEnabled && !isAppName) {
130                                    sb.append(" and w.appName IN (:appName" + index);
131                                    isAppName = true;
132                                }
133                                else {
134                                    if (isAppName) {
135                                        sb.append(", :appName" + index);
136                                    }
137                                }
138                            }
139                            if (i == values.size() - 1) {
140                                sb.append(")");
141                            }
142                            index++;
143                            valArray.add(values.get(i));
144                            orArray.add(colName);
145                            colArray.add(colVar);
146                        }
147                    }
148                    else {
149                        if (entry.getKey().equals(OozieClient.FILTER_USER)) {
150                            List<String> values = filter.get(OozieClient.FILTER_USER);
151                            colName = "user";
152                            for (int i = 0; i < values.size(); i++) {
153                                colVar = "user";
154                                colVar = colVar + index;
155                                if (!isEnabled && !isUser) {
156                                    sb.append(seletStr).append(" where w.user IN (:user" + index);
157                                    isUser = true;
158                                    isEnabled = true;
159                                }
160                                else {
161                                    if (isEnabled && !isUser) {
162                                        sb.append(" and w.user IN (:user" + index);
163                                        isUser = true;
164                                    }
165                                    else {
166                                        if (isUser) {
167                                            sb.append(", :user" + index);
168                                        }
169                                    }
170                                }
171                                if (i == values.size() - 1) {
172                                    sb.append(")");
173                                }
174                                index++;
175                                valArray.add(values.get(i));
176                                orArray.add(colName);
177                                colArray.add(colVar);
178                            }
179                        }
180                    }
181                    if (entry.getKey().equals(OozieClient.FILTER_ID)) {
182                        List<String> values = filter.get(OozieClient.FILTER_ID);
183                        colName = "id";
184                        for (int i = 0; i < values.size(); i++) {
185                            colVar = "id";
186                            colVar = colVar + index;
187                            if (!isEnabled && !isId) {
188                                sb.append(seletStr).append(" where w.id IN (:id" + index);
189                                isId = true;
190                                isEnabled = true;
191                            }
192                            else {
193                                if (isEnabled && !isId) {
194                                    sb.append(" and w.id IN (:id" + index);
195                                    isId = true;
196                                }
197                                else {
198                                    if (isId) {
199                                        sb.append(", :id" + index);
200                                    }
201                                }
202                            }
203                            if (i == values.size() - 1) {
204                                sb.append(")");
205                            }
206                            index++;
207                            valArray.add(values.get(i));
208                            orArray.add(colName);
209                            colArray.add(colVar);
210                        }
211                    }
212                }
213            }
214        }
215
216        int realLen = 0;
217
218        Query q = null;
219        Query qTotal = null;
220        if (orArray.size() == 0) {
221            q = em.createNamedQuery("GET_WORKFLOWS_COLUMNS");
222            q.setFirstResult(start - 1);
223            q.setMaxResults(len);
224            qTotal = em.createNamedQuery("GET_WORKFLOWS_COUNT");
225        }
226        else {
227            if (orArray.size() > 0) {
228                StringBuilder sbTotal = new StringBuilder(sb);
229                sb.append(" order by w.createdTimestamp desc ");
230                q = em.createQuery(sb.toString());
231                q.setFirstResult(start - 1);
232                q.setMaxResults(len);
233                qTotal = em.createQuery(sbTotal.toString().replace(seletStr, countStr));
234                for (int i = 0; i < orArray.size(); i++) {
235                    q.setParameter(colArray.get(i), valArray.get(i));
236                    qTotal.setParameter(colArray.get(i), valArray.get(i));
237                }
238            }
239        }
240
241        OpenJPAQuery kq = OpenJPAPersistence.cast(q);
242        JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
243        fetch.setFetchBatchSize(20);
244        fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
245        fetch.setFetchDirection(FetchDirection.FORWARD);
246        fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
247        List<?> resultList = q.getResultList();
248        List<Object[]> objectArrList = (List<Object[]>) resultList;
249        List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>();
250
251        for (Object[] arr : objectArrList) {
252            WorkflowJobBean ww = getBeanForWorkflowFromArray(arr);
253            wfBeansList.add(ww);
254        }
255
256        realLen = ((Long) qTotal.getSingleResult()).intValue();
257
258        return new WorkflowsInfo(wfBeansList, start, len, realLen);
259    }
260
261    /* (non-Javadoc)
262     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
263     */
264    @Override
265    public String getName() {
266        return "WorkflowsJobGetJPAExecutor";
267    }
268
269    private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) {
270
271        WorkflowJobBean wfBean = new WorkflowJobBean();
272        wfBean.setId((String) arr[0]);
273        if (arr[1] != null) {
274            wfBean.setAppName((String) arr[1]);
275        }
276        if (arr[2] != null) {
277            wfBean.setStatus(Status.valueOf((String) arr[2]));
278        }
279        if (arr[3] != null) {
280            wfBean.setRun((Integer) arr[3]);
281        }
282        if (arr[4] != null) {
283            wfBean.setUser((String) arr[4]);
284        }
285        if (arr[5] != null) {
286            wfBean.setGroup((String) arr[5]);
287        }
288        if (arr[6] != null) {
289            wfBean.setCreatedTime((Timestamp) arr[6]);
290        }
291        if (arr[7] != null) {
292            wfBean.setStartTime((Timestamp) arr[7]);
293        }
294        if (arr[8] != null) {
295            wfBean.setLastModifiedTime((Timestamp) arr[8]);
296        }
297        if (arr[9] != null) {
298            wfBean.setEndTime((Timestamp) arr[9]);
299        }
300        if (arr[10] != null) {
301            wfBean.setExternalId((String) arr[10]);
302        }
303        if (arr[11] != null) {
304            wfBean.setParentId((String) arr[11]);
305        }
306        return wfBean;
307    }
308}