001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.executor.jpa;
020
021import java.sql.Timestamp;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.List;
025import java.util.Map;
026
027import javax.persistence.EntityManager;
028import javax.persistence.Query;
029
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.WorkflowJobBean;
032import org.apache.oozie.WorkflowsInfo;
033import org.apache.oozie.client.OozieClient;
034import org.apache.oozie.client.WorkflowJob.Status;
035import org.apache.oozie.store.StoreStatusFilter;
036import org.apache.oozie.util.DateUtils;
037import org.apache.oozie.util.XLog;
038import org.apache.openjpa.persistence.OpenJPAPersistence;
039import org.apache.openjpa.persistence.OpenJPAQuery;
040import org.apache.openjpa.persistence.jdbc.FetchDirection;
041import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
042import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
043import org.apache.openjpa.persistence.jdbc.ResultSetType;
044
045public class WorkflowsJobGetJPAExecutor implements JPAExecutor<WorkflowsInfo> {
046
047    private static final String seletStr = "Select w.id, w.appName, w.statusStr, w.run, w.user, w.group, w.createdTimestamp, "
048            + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp, w.externalId, w.parentId from WorkflowJobBean w";
049    private static final String countStr = "Select count(w) from WorkflowJobBean w";
050    public static final String DEFAULT_ORDER_BY = " order by w.createdTimestamp desc ";
051
052    private final Map<String, List<String>> filter;
053    private final int start;
054    private final int len;
055
056    /**
057     * This JPA Executor gets the workflows info for the range.
058     *
059     * @param filter
060     * @param start
061     * @param len
062     */
063    public WorkflowsJobGetJPAExecutor(Map<String, List<String>> filter, int start, int len) {
064        this.filter = filter;
065        this.start = start;
066        this.len = len;
067    }
068
069    /* (non-Javadoc)
070     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
071     */
072    @SuppressWarnings("unchecked")
073    @Override
074    public WorkflowsInfo execute(EntityManager em) throws JPAExecutorException {
075        List<String> orArray = new ArrayList<String>();
076        List<String> colArray = new ArrayList<String>();
077        List<Object> valArray = new ArrayList<Object>();
078        StringBuilder sb = new StringBuilder("");
079        String orderBy = DEFAULT_ORDER_BY;
080        boolean isStatus = false;
081        boolean isAppName = false;
082        boolean isUser = false;
083        boolean isEnabled = false;
084        boolean isId = false;
085        int index = 0;
086        for (Map.Entry<String, List<String>> entry : filter.entrySet()) {
087            String colName = null;
088            String colVar = null;
089            if (entry.getKey().equals(OozieClient.FILTER_GROUP)) {
090                XLog.getLog(getClass()).warn("Filter by 'group' is not supported anymore");
091            } else {
092                if (entry.getKey().equals(OozieClient.FILTER_STATUS)) {
093                    List<String> values = filter.get(OozieClient.FILTER_STATUS);
094                    colName = "status";
095                    for (int i = 0; i < values.size(); i++) {
096                        colVar = "status";
097                        colVar = colVar + index;
098                        if (!isEnabled && !isStatus) {
099                            sb.append(seletStr).append(" where w.statusStr IN (:status" + index);
100                            isStatus = true;
101                            isEnabled = true;
102                        }
103                        else {
104                            if (isEnabled && !isStatus) {
105                                sb.append(" and w.statusStr IN (:status" + index);
106                                isStatus = true;
107                            }
108                            else {
109                                if (isStatus) {
110                                    sb.append(", :status" + index);
111                                }
112                            }
113                        }
114                        if (i == values.size() - 1) {
115                            sb.append(")");
116                        }
117                        index++;
118                        valArray.add(values.get(i));
119                        orArray.add(colName);
120                        colArray.add(colVar);
121                    }
122                }
123                else {
124                    if (entry.getKey().equals(OozieClient.FILTER_NAME)) {
125                        List<String> values = filter.get(OozieClient.FILTER_NAME);
126                        colName = "appName";
127                        for (int i = 0; i < values.size(); i++) {
128                            colVar = "appName";
129                            colVar = colVar + index;
130                            if (!isEnabled && !isAppName) {
131                                sb.append(seletStr).append(" where w.appName IN (:appName" + index);
132                                isAppName = true;
133                                isEnabled = true;
134                            }
135                            else {
136                                if (isEnabled && !isAppName) {
137                                    sb.append(" and w.appName IN (:appName" + index);
138                                    isAppName = true;
139                                }
140                                else {
141                                    if (isAppName) {
142                                        sb.append(", :appName" + index);
143                                    }
144                                }
145                            }
146                            if (i == values.size() - 1) {
147                                sb.append(")");
148                            }
149                            index++;
150                            valArray.add(values.get(i));
151                            orArray.add(colName);
152                            colArray.add(colVar);
153                        }
154                    }
155                    else {
156                        if (entry.getKey().equals(OozieClient.FILTER_USER)) {
157                            List<String> values = filter.get(OozieClient.FILTER_USER);
158                            colName = "user";
159                            for (int i = 0; i < values.size(); i++) {
160                                colVar = "user";
161                                colVar = colVar + index;
162                                if (!isEnabled && !isUser) {
163                                    sb.append(seletStr).append(" where w.user IN (:user" + index);
164                                    isUser = true;
165                                    isEnabled = true;
166                                }
167                                else {
168                                    if (isEnabled && !isUser) {
169                                        sb.append(" and w.user IN (:user" + index);
170                                        isUser = true;
171                                    }
172                                    else {
173                                        if (isUser) {
174                                            sb.append(", :user" + index);
175                                        }
176                                    }
177                                }
178                                if (i == values.size() - 1) {
179                                    sb.append(")");
180                                }
181                                index++;
182                                valArray.add(values.get(i));
183                                orArray.add(colName);
184                                colArray.add(colVar);
185                            }
186                        }
187                    }
188                    if (entry.getKey().equals(OozieClient.FILTER_ID)) {
189                        List<String> values = filter.get(OozieClient.FILTER_ID);
190                        colName = "id";
191                        for (int i = 0; i < values.size(); i++) {
192                            colVar = "id";
193                            colVar = colVar + index;
194                            if (!isEnabled && !isId) {
195                                sb.append(seletStr).append(" where w.id IN (:id" + index);
196                                isId = true;
197                                isEnabled = true;
198                            }
199                            else {
200                                if (isEnabled && !isId) {
201                                    sb.append(" and w.id IN (:id" + index);
202                                    isId = true;
203                                }
204                                else {
205                                    if (isId) {
206                                        sb.append(", :id" + index);
207                                    }
208                                }
209                            }
210                            if (i == values.size() - 1) {
211                                sb.append(")");
212                            }
213                            index++;
214                            valArray.add(values.get(i));
215                            orArray.add(colName);
216                            colArray.add(colVar);
217                        }
218                    }
219                    if (entry.getKey().equalsIgnoreCase(OozieClient.FILTER_CREATED_TIME_START)) {
220                        List<String> values = filter.get(OozieClient.FILTER_CREATED_TIME_START);
221                        colName = "createdTimestampStart";
222                        if (values.size() > 1) {
223                            throw new JPAExecutorException(ErrorCode.E0302,
224                                    "cannot specify multiple startcreatedtime");
225                        }
226                        colVar = colName;
227                        colVar = colVar + index;
228                        if (!isEnabled) {
229                            sb.append(seletStr).append(" where w.createdTimestamp >= :" + colVar);
230                            isEnabled = true;
231                        }
232                        else {
233                            sb.append(" and w.createdTimestamp >= :" + colVar);
234                        }
235                        index++;
236                        Date createdTime = null;
237                        try {
238                            createdTime = parseCreatedTimeString(values.get(0));
239                        }
240                        catch (Exception e) {
241                            throw new JPAExecutorException(ErrorCode.E0302, e.getMessage());
242                        }
243                        Timestamp createdTimeStamp = new Timestamp(createdTime.getTime());
244                        valArray.add(createdTimeStamp);
245                        orArray.add(colName);
246                        colArray.add(colVar);
247
248                    }
249                    if (entry.getKey().equalsIgnoreCase(OozieClient.FILTER_CREATED_TIME_END)) {
250                        List<String> values = filter.get(OozieClient.FILTER_CREATED_TIME_END);
251                        colName = "createdTimestampEnd";
252                        if (values.size() > 1) {
253                            throw new JPAExecutorException(ErrorCode.E0302,
254                                    "cannot specify multiple endcreatedtime");
255                        }
256                        colVar = colName;
257                        colVar = colVar + index;
258                        if (!isEnabled) {
259                            sb.append(seletStr).append(" where w.createdTimestamp <= :" + colVar);
260                            isEnabled = true;
261                        }
262                        else {
263                            sb.append(" and w.createdTimestamp <= :" + colVar);
264                        }
265                        index++;
266                        Date createdTime = null;
267                        try {
268                            createdTime = parseCreatedTimeString(values.get(0));
269                        }
270                        catch (Exception e) {
271                            throw new JPAExecutorException(ErrorCode.E0302, e.getMessage());
272                        }
273                        Timestamp createdTimeStamp = new Timestamp(createdTime.getTime());
274                        valArray.add(createdTimeStamp);
275                        orArray.add(colName);
276                        colArray.add(colVar);
277                    }
278                }
279            }
280        }
281
282        orderBy = StoreStatusFilter.getSortBy(filter, orderBy);
283        int realLen = 0;
284
285        Query q = null;
286        Query qTotal = null;
287        if (orArray.size() == 0 && orderBy.equals(DEFAULT_ORDER_BY)) {
288            q = em.createNamedQuery("GET_WORKFLOWS_COLUMNS");
289            q.setFirstResult(start - 1);
290            q.setMaxResults(len);
291            qTotal = em.createNamedQuery("GET_WORKFLOWS_COUNT");
292        }
293        else {
294            sb = sb.toString().trim().length() == 0 ? sb.append(seletStr) : sb;
295            String sbTotal = sb.toString();
296            sb.append(orderBy);
297            q = em.createQuery(sb.toString());
298            q.setFirstResult(start - 1);
299            q.setMaxResults(len);
300            qTotal = em.createQuery(sbTotal.replace(seletStr, countStr));
301
302            for (int i = 0; i < orArray.size(); i++) {
303                q.setParameter(colArray.get(i), valArray.get(i));
304                qTotal.setParameter(colArray.get(i), valArray.get(i));
305            }
306        }
307
308        OpenJPAQuery kq = OpenJPAPersistence.cast(q);
309        JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
310        fetch.setFetchBatchSize(20);
311        fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
312        fetch.setFetchDirection(FetchDirection.FORWARD);
313        fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
314        List<?> resultList = q.getResultList();
315        List<Object[]> objectArrList = (List<Object[]>) resultList;
316        List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>();
317
318        for (Object[] arr : objectArrList) {
319            WorkflowJobBean ww = getBeanForWorkflowFromArray(arr);
320            wfBeansList.add(ww);
321        }
322
323        realLen = ((Long) qTotal.getSingleResult()).intValue();
324
325        return new WorkflowsInfo(wfBeansList, start, len, realLen);
326    }
327
328    /* (non-Javadoc)
329     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
330     */
331    @Override
332    public String getName() {
333        return "WorkflowsJobGetJPAExecutor";
334    }
335
336    private Date parseCreatedTimeString(String time) throws Exception{
337        Date createdTime = null;
338        int offset = 0;
339        if (Character.isLetter(time.charAt(time.length() - 1))) {
340            switch (time.charAt(time.length() - 1)) {
341                case 'd':
342                    offset = Integer.parseInt(time.substring(0, time.length() - 1));
343                    if(offset > 0) {
344                        throw new IllegalArgumentException("offset must be minus from currentTime.");
345                    }
346                    createdTime = org.apache.commons.lang.time.DateUtils.addDays(new Date(), offset);
347                    break;
348                case 'h':
349                    offset =  Integer.parseInt(time.substring(0, time.length() - 1));
350                    if(offset > 0) {
351                        throw new IllegalArgumentException("offset must be minus from currentTime.");
352                    }
353                    createdTime = org.apache.commons.lang.time.DateUtils.addHours(new Date(), offset);
354                    break;
355                case 'm':
356                    offset =  Integer.parseInt(time.substring(0, time.length() - 1));
357                    if(offset > 0) {
358                        throw new IllegalArgumentException("offset must be minus from currentTime.");
359                    }
360                    createdTime = org.apache.commons.lang.time.DateUtils.addMinutes(new Date(), offset);
361                    break;
362                case 'Z':
363                    createdTime = DateUtils.parseDateUTC(time);
364                    break;
365                default:
366                    throw new IllegalArgumentException("Unsupported time format: " + time + StoreStatusFilter.TIME_FORMAT);
367            }
368        } else {
369            throw new IllegalArgumentException("The format of time is wrong: " + time + StoreStatusFilter.TIME_FORMAT);
370        }
371        return createdTime;
372    }
373
374    private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) {
375
376        WorkflowJobBean wfBean = new WorkflowJobBean();
377        wfBean.setId((String) arr[0]);
378        if (arr[1] != null) {
379            wfBean.setAppName((String) arr[1]);
380        }
381        if (arr[2] != null) {
382            wfBean.setStatus(Status.valueOf((String) arr[2]));
383        }
384        if (arr[3] != null) {
385            wfBean.setRun((Integer) arr[3]);
386        }
387        if (arr[4] != null) {
388            wfBean.setUser((String) arr[4]);
389        }
390        if (arr[5] != null) {
391            wfBean.setGroup((String) arr[5]);
392        }
393        if (arr[6] != null) {
394            wfBean.setCreatedTime((Timestamp) arr[6]);
395        }
396        if (arr[7] != null) {
397            wfBean.setStartTime((Timestamp) arr[7]);
398        }
399        if (arr[8] != null) {
400            wfBean.setLastModifiedTime((Timestamp) arr[8]);
401        }
402        if (arr[9] != null) {
403            wfBean.setEndTime((Timestamp) arr[9]);
404        }
405        if (arr[10] != null) {
406            wfBean.setExternalId((String) arr[10]);
407        }
408        if (arr[11] != null) {
409            wfBean.setParentId((String) arr[11]);
410        }
411        return wfBean;
412    }
413}