001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.store;
019    
020    import java.sql.Connection;
021    import java.sql.SQLException;
022    import java.sql.Timestamp;
023    import java.util.ArrayList;
024    import java.util.Date;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.concurrent.Callable;
028    
029    import javax.persistence.EntityManager;
030    import javax.persistence.Query;
031    
032    import org.apache.oozie.ErrorCode;
033    import org.apache.oozie.WorkflowActionBean;
034    import org.apache.oozie.WorkflowJobBean;
035    import org.apache.oozie.WorkflowsInfo;
036    import org.apache.oozie.client.OozieClient;
037    import org.apache.oozie.client.WorkflowJob.Status;
038    import org.apache.oozie.service.InstrumentationService;
039    import org.apache.oozie.service.SchemaService;
040    import org.apache.oozie.service.Services;
041    import org.apache.oozie.service.SchemaService.SchemaName;
042    import org.apache.oozie.util.Instrumentation;
043    import org.apache.oozie.util.ParamChecker;
044    import org.apache.oozie.util.XLog;
045    import org.apache.oozie.workflow.WorkflowException;
046    import org.apache.openjpa.persistence.OpenJPAEntityManager;
047    import org.apache.openjpa.persistence.OpenJPAPersistence;
048    import org.apache.openjpa.persistence.OpenJPAQuery;
049    import org.apache.openjpa.persistence.jdbc.FetchDirection;
050    import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
051    import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
052    import org.apache.openjpa.persistence.jdbc.ResultSetType;
053    
054    /**
055     * DB Implementation of Workflow Store
056     */
057    public class WorkflowStore extends Store {
058        private Connection conn;
059        private EntityManager entityManager;
060        private boolean selectForUpdate;
061        private static final String INSTR_GROUP = "db";
062        public static final int LOCK_TIMEOUT = 50000;
063        private static final String seletStr = "Select w.id, w.appName, w.status, w.run, w.user, w.group, w.createdTimestamp, "
064                + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp from WorkflowJobBean w";
065        private static final String countStr = "Select count(w) from WorkflowJobBean w";
066    
067        public WorkflowStore() {
068        }
069    
070        public WorkflowStore(Connection connection, boolean selectForUpdate) throws StoreException {
071            super();
072            conn = ParamChecker.notNull(connection, "conn");
073            entityManager = getEntityManager();
074            this.selectForUpdate = selectForUpdate;
075        }
076    
077        public WorkflowStore(Connection connection, Store store, boolean selectForUpdate) throws StoreException {
078            super(store);
079            conn = ParamChecker.notNull(connection, "conn");
080            entityManager = getEntityManager();
081            this.selectForUpdate = selectForUpdate;
082        }
083    
084        public WorkflowStore(boolean selectForUpdate) throws StoreException {
085            super();
086            entityManager = getEntityManager();
087            javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.WORKFLOW);
088            OpenJPAEntityManager kem = OpenJPAPersistence.cast(entityManager);
089            conn = (Connection) kem.getConnection();
090            this.selectForUpdate = selectForUpdate;
091        }
092    
093        public WorkflowStore(Store store, boolean selectForUpdate) throws StoreException {
094            super(store);
095            entityManager = getEntityManager();
096            this.selectForUpdate = selectForUpdate;
097        }
098    
099        /**
100         * Create a Workflow and return a WorkflowJobBean. It also creates the process instance for the job.
101         *
102         * @param workflow workflow bean
103         * @throws StoreException
104         */
105    
106        public void insertWorkflow(final WorkflowJobBean workflow) throws StoreException {
107            ParamChecker.notNull(workflow, "workflow");
108    
109            doOperation("insertWorkflow", new Callable<Void>() {
110                public Void call() throws SQLException, StoreException, WorkflowException {
111                    entityManager.persist(workflow);
112                    return null;
113                }
114            });
115        }
116    
117        /**
118         * Load the Workflow into a Bean and return it. Also load the Workflow Instance into the bean. And lock the Workflow
119         * depending on the locking parameter.
120         *
121         * @param id Workflow ID
122         * @param locking true if Workflow is to be locked
123         * @return WorkflowJobBean
124         * @throws StoreException
125         */
126        public WorkflowJobBean getWorkflow(final String id, final boolean locking) throws StoreException {
127            ParamChecker.notEmpty(id, "WorkflowID");
128            WorkflowJobBean wfBean = doOperation("getWorkflow", new Callable<WorkflowJobBean>() {
129                public WorkflowJobBean call() throws SQLException, StoreException, WorkflowException, InterruptedException {
130                    WorkflowJobBean wfBean = null;
131                    wfBean = getWorkflowOnly(id, locking);
132                    if (wfBean == null) {
133                        throw new StoreException(ErrorCode.E0604, id);
134                    }
135                    /*
136                     * WorkflowInstance wfInstance; //krishna and next line
137                     * wfInstance = workflowLib.get(id); wfInstance =
138                     * wfBean.get(wfBean.getWfInstance());
139                     * wfBean.setWorkflowInstance(wfInstance);
140                     * wfBean.setWfInstance(wfInstance);
141                     */
142                    return wfBean;
143                }
144            });
145            return wfBean;
146        }
147    
148        /**
149         * Get the number of Workflows with the given status.
150         *
151         * @param status Workflow Status.
152         * @return number of Workflows with given status.
153         * @throws StoreException
154         */
155        public int getWorkflowCountWithStatus(final String status) throws StoreException {
156            ParamChecker.notEmpty(status, "status");
157            Integer cnt = doOperation("getWorkflowCountWithStatus", new Callable<Integer>() {
158                public Integer call() throws SQLException {
159                    Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS");
160                    q.setParameter("status", status);
161                    Long count = (Long) q.getSingleResult();
162                    return Integer.valueOf(count.intValue());
163                }
164            });
165            return cnt.intValue();
166        }
167    
168        /**
169         * Get the number of Workflows with the given status which was modified in given time limit.
170         *
171         * @param status Workflow Status.
172         * @param secs No. of seconds within which the workflow got modified.
173         * @return number of Workflows modified within given time with given status.
174         * @throws StoreException
175         */
176        public int getWorkflowCountWithStatusInLastNSeconds(final String status, final int secs) throws StoreException {
177            ParamChecker.notEmpty(status, "status");
178            ParamChecker.notEmpty(status, "secs");
179            Integer cnt = doOperation("getWorkflowCountWithStatusInLastNSecs", new Callable<Integer>() {
180                public Integer call() throws SQLException {
181                    Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS_IN_LAST_N_SECS");
182                    Timestamp ts = new Timestamp(System.currentTimeMillis() - (secs * 1000));
183                    q.setParameter("status", status);
184                    q.setParameter("lastModTime", ts);
185                    Long count = (Long) q.getSingleResult();
186                    return Integer.valueOf(count.intValue());
187                }
188            });
189            return cnt.intValue();
190        }
191    
192        /**
193         * Update the data from Workflow Bean to DB along with the workflow instance data. Action table is not updated
194         *
195         * @param wfBean Workflow Bean
196         * @throws StoreException If Workflow doesn't exist
197         */
198        public void updateWorkflow(final WorkflowJobBean wfBean) throws StoreException {
199            ParamChecker.notNull(wfBean, "WorkflowJobBean");
200            doOperation("updateWorkflow", new Callable<Void>() {
201                public Void call() throws SQLException, StoreException, WorkflowException {
202                    Query q = entityManager.createNamedQuery("UPDATE_WORKFLOW");
203                    q.setParameter("id", wfBean.getId());
204                    setWFQueryParameters(wfBean, q);
205                    q.executeUpdate();
206                    return null;
207                }
208            });
209        }
210    
211        /**
212         * Create a new Action record in the ACTIONS table with the given Bean.
213         *
214         * @param action WorkflowActionBean
215         * @throws StoreException If the action is already present
216         */
217        public void insertAction(final WorkflowActionBean action) throws StoreException {
218            ParamChecker.notNull(action, "WorkflowActionBean");
219            doOperation("insertAction", new Callable<Void>() {
220                public Void call() throws SQLException, StoreException, WorkflowException {
221                    entityManager.persist(action);
222                    return null;
223                }
224            });
225        }
226    
227        /**
228         * Load the action data and returns a bean.
229         *
230         * @param id Action Id
231         * @param locking true if the action is to be locked
232         * @return Action Bean
233         * @throws StoreException If action doesn't exist
234         */
235        public WorkflowActionBean getAction(final String id, final boolean locking) throws StoreException {
236            ParamChecker.notEmpty(id, "ActionID");
237            WorkflowActionBean action = doOperation("getAction", new Callable<WorkflowActionBean>() {
238                public WorkflowActionBean call() throws SQLException, StoreException, WorkflowException,
239                        InterruptedException {
240                    Query q = entityManager.createNamedQuery("GET_ACTION");
241                    /*
242                     * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q);
243                     * FetchPlan fetch = oq.getFetchPlan();
244                     * fetch.setReadLockMode(LockModeType.WRITE);
245                     * fetch.setLockTimeout(1000); // 1 seconds }
246                     */
247                    WorkflowActionBean action = null;
248                    q.setParameter("id", id);
249                    List<WorkflowActionBean> actions = q.getResultList();
250                    // action = (WorkflowActionBean) q.getSingleResult();
251                    if (actions.size() > 0) {
252                        action = actions.get(0);
253                    }
254                    else {
255                        throw new StoreException(ErrorCode.E0605, id);
256                    }
257    
258                    /*
259                     * if (locking) return action; else
260                     */
261                    // return action;
262                    return getBeanForRunningAction(action);
263                }
264            });
265            return action;
266        }
267    
268        /**
269         * Update the given action bean to DB.
270         *
271         * @param action Action Bean
272         * @throws StoreException if action doesn't exist
273         */
274        public void updateAction(final WorkflowActionBean action) throws StoreException {
275            ParamChecker.notNull(action, "WorkflowActionBean");
276            doOperation("updateAction", new Callable<Void>() {
277                public Void call() throws SQLException, StoreException, WorkflowException {
278                    Query q = entityManager.createNamedQuery("UPDATE_ACTION");
279                    q.setParameter("id", action.getId());
280                    setActionQueryParameters(action, q);
281                    q.executeUpdate();
282                    return null;
283                }
284            });
285        }
286    
287        /**
288         * Delete the Action with given id.
289         *
290         * @param id Action ID
291         * @throws StoreException if Action doesn't exist
292         */
293        public void deleteAction(final String id) throws StoreException {
294            ParamChecker.notEmpty(id, "ActionID");
295            doOperation("deleteAction", new Callable<Void>() {
296                public Void call() throws SQLException, StoreException, WorkflowException {
297                    /*
298                     * Query q = entityManager.createNamedQuery("DELETE_ACTION");
299                     * q.setParameter("id", id); q.executeUpdate();
300                     */
301                    WorkflowActionBean action = entityManager.find(WorkflowActionBean.class, id);
302                    if (action != null) {
303                        entityManager.remove(action);
304                    }
305                    return null;
306                }
307            });
308        }
309    
310        /**
311         * Loads all the actions for the given Workflow. Also locks all the actions if locking is true.
312         *
313         * @param wfId Workflow ID
314         * @param locking true if Actions are to be locked
315         * @return A List of WorkflowActionBean
316         * @throws StoreException
317         */
318        public List<WorkflowActionBean> getActionsForWorkflow(final String wfId, final boolean locking)
319                throws StoreException {
320            ParamChecker.notEmpty(wfId, "WorkflowID");
321            List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow",
322                                                           new Callable<List<WorkflowActionBean>>() {
323                                                               public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException,
324                                                                       InterruptedException {
325                                                                   List<WorkflowActionBean> actions;
326                                                                   List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>();
327                                                                   try {
328                                                                       Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW");
329    
330                                                                       /*
331                                                                       * OpenJPAQuery oq = OpenJPAPersistence.cast(q);
332                                                                       * if (locking) { //
333                                                                       * q.setHint("openjpa.FetchPlan.ReadLockMode"
334                                                                       * ,"WRITE"); FetchPlan fetch = oq.getFetchPlan();
335                                                                       * fetch.setReadLockMode(LockModeType.WRITE);
336                                                                       * fetch.setLockTimeout(1000); // 1 seconds }
337                                                                       */
338                                                                       q.setParameter("wfId", wfId);
339                                                                       actions = q.getResultList();
340                                                                       for (WorkflowActionBean a : actions) {
341                                                                           WorkflowActionBean aa = getBeanForRunningAction(a);
342                                                                           actionList.add(aa);
343                                                                       }
344                                                                   }
345                                                                   catch (IllegalStateException e) {
346                                                                       throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
347                                                                   }
348                                                                   /*
349                                                                   * if (locking) { return actions; } else {
350                                                                   */
351                                                                   return actionList;
352                                                                   // }
353                                                               }
354                                                           });
355            return actions;
356        }
357    
358        /**
359         * Loads given number of actions for the given Workflow. Also locks all the actions if locking is true.
360         *
361         * @param wfId Workflow ID
362         * @param start offset for select statement
363         * @param len number of Workflow Actions to be returned
364         * @param locking true if Actions are to be locked
365         * @return A List of WorkflowActionBean
366         * @throws StoreException
367         */
368        public List<WorkflowActionBean> getActionsSubsetForWorkflow(final String wfId, final int start, final int len)
369                throws StoreException {
370            ParamChecker.notEmpty(wfId, "WorkflowID");
371            List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow",
372                                                           new Callable<List<WorkflowActionBean>>() {
373                                                               public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException,
374                                                                       InterruptedException {
375                                                                   List<WorkflowActionBean> actions;
376                                                                   List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>();
377                                                                   try {
378                                                                       Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW");
379                                                                       OpenJPAQuery oq = OpenJPAPersistence.cast(q);
380                                                                       q.setParameter("wfId", wfId);
381                                                                       q.setFirstResult(start - 1);
382                                                                       q.setMaxResults(len);
383                                                                       actions = q.getResultList();
384                                                                       for (WorkflowActionBean a : actions) {
385                                                                           WorkflowActionBean aa = getBeanForRunningAction(a);
386                                                                           actionList.add(aa);
387                                                                       }
388                                                                   }
389                                                                   catch (IllegalStateException e) {
390                                                                       throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
391                                                                   }
392                                                                   return actionList;
393                                                               }
394                                                           });
395            return actions;
396        }
397    
398        /**
399         * Load All the actions that are pending for more than given time.
400         *
401         * @param minimumPendingAgeSecs Minimum Pending age in seconds
402         * @return List of action beans
403         * @throws StoreException
404         */
405        public List<WorkflowActionBean> getPendingActions(final long minimumPendingAgeSecs) throws StoreException {
406            List<WorkflowActionBean> actions = doOperation("getPendingActions", new Callable<List<WorkflowActionBean>>() {
407                public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
408                    Timestamp ts = new Timestamp(System.currentTimeMillis() - minimumPendingAgeSecs * 1000);
409                    List<WorkflowActionBean> actionList = null;
410                    try {
411                        Query q = entityManager.createNamedQuery("GET_PENDING_ACTIONS");
412                        q.setParameter("pendingAge", ts);
413                        actionList = q.getResultList();
414                    }
415                    catch (IllegalStateException e) {
416                        throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
417                    }
418                    return actionList;
419                }
420            });
421            return actions;
422        }
423    
424        /**
425         * Load All the actions that are running and were last checked after now - miminumCheckAgeSecs
426         *
427         * @param checkAgeSecs check age in seconds.
428         * @return List of action beans.
429         * @throws StoreException
430         */
431        public List<WorkflowActionBean> getRunningActions(final long checkAgeSecs) throws StoreException {
432            List<WorkflowActionBean> actions = doOperation("getRunningActions", new Callable<List<WorkflowActionBean>>() {
433    
434                public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
435                    List<WorkflowActionBean> actions = new ArrayList<WorkflowActionBean>();
436                    Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
437                    try {
438                        Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS");
439                        q.setParameter("lastCheckTime", ts);
440                        actions = q.getResultList();
441                    }
442                    catch (IllegalStateException e) {
443                        throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
444                    }
445    
446                    return actions;
447                }
448            });
449            return actions;
450        }
451    
452        /**
453         * Load All the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL.
454         * 
455         * @param wfId String
456         * @return List of action beans
457         * @throws StoreException
458         */
459        public List<WorkflowActionBean> getRetryAndManualActions(final String wfId) throws StoreException {
460            List<WorkflowActionBean> actions = doOperation("GET_RETRY_MANUAL_ACTIONS",
461                    new Callable<List<WorkflowActionBean>>() {
462                        public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
463                            List<WorkflowActionBean> actionList = null;
464                            try {
465                                Query q = entityManager.createNamedQuery("GET_RETRY_MANUAL_ACTIONS");
466                                q.setParameter("wfId", wfId);
467                                actionList = q.getResultList();
468                            }
469                            catch (IllegalStateException e) {
470                                throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
471                            }
472    
473                            return actionList;
474                        }
475                    });
476            return actions;
477        }
478    
479        /**
480         * Loads all the jobs that are satisfying the given filter condition. Filters can be applied on user, group,
481         * appName, status.
482         *
483         * @param filter Filter condition
484         * @param start offset for select statement
485         * @param len number of Workflows to be returned
486         * @return A list of workflows
487         * @throws StoreException
488         */
489        public WorkflowsInfo getWorkflowsInfo(final Map<String, List<String>> filter, final int start, final int len)
490                throws StoreException {
491    
492            WorkflowsInfo workFlowsInfo = doOperation("getWorkflowsInfo", new Callable<WorkflowsInfo>() {
493                @SuppressWarnings("unchecked")
494                public WorkflowsInfo call() throws SQLException, StoreException {
495    
496                    List<String> orArray = new ArrayList<String>();
497                    List<String> colArray = new ArrayList<String>();
498                    List<String> valArray = new ArrayList<String>();
499                    StringBuilder sb = new StringBuilder("");
500                    boolean isStatus = false;
501                    boolean isGroup = false;
502                    boolean isAppName = false;
503                    boolean isUser = false;
504                    boolean isEnabled = false;
505                    int index = 0;
506                    for (Map.Entry<String, List<String>> entry : filter.entrySet()) {
507                        String colName = null;
508                        String colVar = null;
509                        if (entry.getKey().equals(OozieClient.FILTER_GROUP)) {
510                            List<String> values = filter.get(OozieClient.FILTER_GROUP);
511                            colName = "group";
512                            for (int i = 0; i < values.size(); i++) {
513                                colVar = "group";
514                                colVar = colVar + index;
515                                if (!isEnabled && !isGroup) {
516                                    sb.append(seletStr).append(" where w.group IN (:group" + index);
517                                    isGroup = true;
518                                    isEnabled = true;
519                                }
520                                else {
521                                    if (isEnabled && !isGroup) {
522                                        sb.append(" and w.group IN (:group" + index);
523                                        isGroup = true;
524                                    }
525                                    else {
526                                        if (isGroup) {
527                                            sb.append(", :group" + index);
528                                        }
529                                    }
530                                }
531                                if (i == values.size() - 1) {
532                                    sb.append(")");
533                                }
534                                index++;
535                                valArray.add(values.get(i));
536                                orArray.add(colName);
537                                colArray.add(colVar);
538                            }
539                        }
540                        else {
541                            if (entry.getKey().equals(OozieClient.FILTER_STATUS)) {
542                                List<String> values = filter.get(OozieClient.FILTER_STATUS);
543                                colName = "status";
544                                for (int i = 0; i < values.size(); i++) {
545                                    colVar = "status";
546                                    colVar = colVar + index;
547                                    if (!isEnabled && !isStatus) {
548                                        sb.append(seletStr).append(" where w.status IN (:status" + index);
549                                        isStatus = true;
550                                        isEnabled = true;
551                                    }
552                                    else {
553                                        if (isEnabled && !isStatus) {
554                                            sb.append(" and w.status IN (:status" + index);
555                                            isStatus = true;
556                                        }
557                                        else {
558                                            if (isStatus) {
559                                                sb.append(", :status" + index);
560                                            }
561                                        }
562                                    }
563                                    if (i == values.size() - 1) {
564                                        sb.append(")");
565                                    }
566                                    index++;
567                                    valArray.add(values.get(i));
568                                    orArray.add(colName);
569                                    colArray.add(colVar);
570                                }
571                            }
572                            else {
573                                if (entry.getKey().equals(OozieClient.FILTER_NAME)) {
574                                    List<String> values = filter.get(OozieClient.FILTER_NAME);
575                                    colName = "appName";
576                                    for (int i = 0; i < values.size(); i++) {
577                                        colVar = "appName";
578                                        colVar = colVar + index;
579                                        if (!isEnabled && !isAppName) {
580                                            sb.append(seletStr).append(" where w.appName IN (:appName" + index);
581                                            isAppName = true;
582                                            isEnabled = true;
583                                        }
584                                        else {
585                                            if (isEnabled && !isAppName) {
586                                                sb.append(" and w.appName IN (:appName" + index);
587                                                isAppName = true;
588                                            }
589                                            else {
590                                                if (isAppName) {
591                                                    sb.append(", :appName" + index);
592                                                }
593                                            }
594                                        }
595                                        if (i == values.size() - 1) {
596                                            sb.append(")");
597                                        }
598                                        index++;
599                                        valArray.add(values.get(i));
600                                        orArray.add(colName);
601                                        colArray.add(colVar);
602                                    }
603                                }
604                                else {
605                                    if (entry.getKey().equals(OozieClient.FILTER_USER)) {
606                                        List<String> values = filter.get(OozieClient.FILTER_USER);
607                                        colName = "user";
608                                        for (int i = 0; i < values.size(); i++) {
609                                            colVar = "user";
610                                            colVar = colVar + index;
611                                            if (!isEnabled && !isUser) {
612                                                sb.append(seletStr).append(" where w.user IN (:user" + index);
613                                                isUser = true;
614                                                isEnabled = true;
615                                            }
616                                            else {
617                                                if (isEnabled && !isUser) {
618                                                    sb.append(" and w.user IN (:user" + index);
619                                                    isUser = true;
620                                                }
621                                                else {
622                                                    if (isUser) {
623                                                        sb.append(", :user" + index);
624                                                    }
625                                                }
626                                            }
627                                            if (i == values.size() - 1) {
628                                                sb.append(")");
629                                            }
630                                            index++;
631                                            valArray.add(values.get(i));
632                                            orArray.add(colName);
633                                            colArray.add(colVar);
634                                        }
635                                    }
636                                }
637                            }
638                        }
639                    }
640    
641                    int realLen = 0;
642    
643                    Query q = null;
644                    Query qTotal = null;
645                    if (orArray.size() == 0) {
646                        q = entityManager.createNamedQuery("GET_WORKFLOWS_COLUMNS");
647                        q.setFirstResult(start - 1);
648                        q.setMaxResults(len);
649                        qTotal = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT");
650                    }
651                    else {
652                        if (orArray.size() > 0) {
653                            StringBuilder sbTotal = new StringBuilder(sb);
654                            sb.append(" order by w.startTimestamp desc ");
655                            XLog.getLog(getClass()).debug("Created String is **** " + sb.toString());
656                            q = entityManager.createQuery(sb.toString());
657                            q.setFirstResult(start - 1);
658                            q.setMaxResults(len);
659                            qTotal = entityManager.createQuery(sbTotal.toString().replace(seletStr, countStr));
660                            for (int i = 0; i < orArray.size(); i++) {
661                                q.setParameter(colArray.get(i), valArray.get(i));
662                                qTotal.setParameter(colArray.get(i), valArray.get(i));
663                            }
664                        }
665                    }
666    
667                    OpenJPAQuery kq = OpenJPAPersistence.cast(q);
668                    JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
669                    fetch.setFetchBatchSize(20);
670                    fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
671                    fetch.setFetchDirection(FetchDirection.FORWARD);
672                    fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
673                    List<?> resultList = q.getResultList();
674                    List<Object[]> objectArrList = (List<Object[]>) resultList;
675                    List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>();
676    
677                    for (Object[] arr : objectArrList) {
678                        WorkflowJobBean ww = getBeanForWorkflowFromArray(arr);
679                        wfBeansList.add(ww);
680                    }
681    
682                    realLen = ((Long) qTotal.getSingleResult()).intValue();
683    
684                    return new WorkflowsInfo(wfBeansList, start, len, realLen);
685                }
686            });
687            return workFlowsInfo;
688    
689        }
690    
691        /**
692         * Load the Workflow and all Action details and return a WorkflowJobBean. Workflow Instance is not loaded
693         *
694         * @param id Workflow Id
695         * @return Workflow Bean
696         * @throws StoreException If Workflow doesn't exist
697         */
698        public WorkflowJobBean getWorkflowInfo(final String id) throws StoreException {
699            ParamChecker.notEmpty(id, "WorkflowID");
700            WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() {
701                public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException {
702                    WorkflowJobBean wfBean = null;
703                    wfBean = getWorkflowforInfo(id, false);
704                    if (wfBean == null) {
705                        throw new StoreException(ErrorCode.E0604, id);
706                    }
707                    else {
708                        wfBean.setActions(getActionsForWorkflow(id, false));
709                    }
710                    return wfBean;
711                }
712            });
713            return wfBean;
714        }
715    
716        /**
717         * Load the Workflow and subset Actions details and return a WorkflowJobBean. Workflow Instance is not loaded
718         *
719         * @param id Workflow Id
720         * @param start offset for select statement for actions
721         * @param len number of Workflow Actions to be returned
722         * @return Workflow Bean
723         * @throws StoreException If Workflow doesn't exist
724         */
725        public WorkflowJobBean getWorkflowInfoWithActionsSubset(final String id, final int start, final int len) throws StoreException {
726            ParamChecker.notEmpty(id, "WorkflowID");
727            WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() {
728                public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException {
729                    WorkflowJobBean wfBean = null;
730                    wfBean = getWorkflowforInfo(id, false);
731                    if (wfBean == null) {
732                        throw new StoreException(ErrorCode.E0604, id);
733                    }
734                    else {
735                        wfBean.setActions(getActionsSubsetForWorkflow(id, start, len));
736                    }
737                    return wfBean;
738                }
739            });
740            return wfBean;
741        }
742    
743        /**
744         * Get the Workflow ID with given external ID which will be assigned for the subworkflows.
745         *
746         * @param externalId external ID
747         * @return Workflow ID
748         * @throws StoreException if there is no job with external ID
749         */
750        public String getWorkflowIdForExternalId(final String externalId) throws StoreException {
751            ParamChecker.notEmpty(externalId, "externalId");
752            String wfId = doOperation("getWorkflowIdForExternalId", new Callable<String>() {
753                public String call() throws SQLException, StoreException {
754                    String id = "";
755                    Query q = entityManager.createNamedQuery("GET_WORKFLOW_ID_FOR_EXTERNAL_ID");
756                    q.setParameter("externalId", externalId);
757                    List<String> w = q.getResultList();
758                    if (w.size() == 0) {
759                        id = "";
760                    }
761                    else {
762                        int index = w.size() - 1;
763                        id = w.get(index);
764                    }
765                    return id;
766                }
767            });
768            return wfId;
769        }
770    
771        private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
772    
773        /**
774         * Purge the Workflows Completed older than given days.
775         *
776         * @param olderThanDays number of days for which to preserve the workflows
777         * @throws StoreException
778         */
779        public void purge(final long olderThanDays, final int limit) throws StoreException {
780            doOperation("purge", new Callable<Void>() {
781                public Void call() throws SQLException, StoreException, WorkflowException {
782                    Timestamp maxEndTime = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS));
783                    Query q = entityManager.createNamedQuery("GET_COMPLETED_WORKFLOWS_OLDER_THAN");
784                    q.setParameter("endTime", maxEndTime);
785                    q.setMaxResults(limit);
786                    List<WorkflowJobBean> workflows = q.getResultList();
787                    int actionDeleted = 0;
788                    if (workflows.size() != 0) {
789                        for (WorkflowJobBean w : workflows) {
790                            String wfId = w.getId();
791                            entityManager.remove(w);
792                            Query g = entityManager.createNamedQuery("DELETE_ACTIONS_FOR_WORKFLOW");
793                            g.setParameter("wfId", wfId);
794                            actionDeleted += g.executeUpdate();
795                        }
796                    }
797                    XLog.getLog(getClass()).debug("ENDED Workflow Purge deleted jobs :" + workflows.size() + " and actions " + actionDeleted);
798                    return null;
799                }
800            });
801        }
802    
803        private <V> V doOperation(String name, Callable<V> command) throws StoreException {
804            try {
805                Instrumentation.Cron cron = new Instrumentation.Cron();
806                cron.start();
807                V retVal;
808                try {
809                    retVal = command.call();
810                }
811                finally {
812                    cron.stop();
813                }
814                Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron);
815                return retVal;
816            }
817            catch (StoreException ex) {
818                throw ex;
819            }
820            catch (SQLException ex) {
821                throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex);
822            }
823            catch (Exception e) {
824                throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e);
825            }
826        }
827    
828        private WorkflowJobBean getWorkflowOnly(final String id, boolean locking) throws SQLException,
829                InterruptedException, StoreException {
830            WorkflowJobBean wfBean = null;
831            Query q = entityManager.createNamedQuery("GET_WORKFLOW");
832            /*
833             * if (locking) { // q.setHint("openjpa.FetchPlan.ReadLockMode","READ");
834             * OpenJPAQuery oq = OpenJPAPersistence.cast(q); FetchPlan fetch =
835             * oq.getFetchPlan(); fetch.setReadLockMode(LockModeType.WRITE);
836             * fetch.setLockTimeout(-1); // unlimited }
837             */
838            q.setParameter("id", id);
839            List<WorkflowJobBean> w = q.getResultList();
840            if (w.size() > 0) {
841                wfBean = w.get(0);
842            }
843            return wfBean;
844            // return getBeanForRunningWorkflow(wfBean);
845        }
846    
847        private WorkflowJobBean getWorkflowforInfo(final String id, boolean locking) throws SQLException,
848                InterruptedException, StoreException {
849            WorkflowJobBean wfBean = null;
850            Query q = entityManager.createNamedQuery("GET_WORKFLOW");
851            q.setParameter("id", id);
852            List<WorkflowJobBean> w = q.getResultList();
853            if (w.size() > 0) {
854                wfBean = w.get(0);
855                return getBeanForRunningWorkflow(wfBean);
856            }
857            return null;
858        }
859    
860        private WorkflowJobBean getBeanForRunningWorkflow(WorkflowJobBean w) throws SQLException {
861            WorkflowJobBean wfBean = new WorkflowJobBean();
862            wfBean.setId(w.getId());
863            wfBean.setAppName(w.getAppName());
864            wfBean.setAppPath(w.getAppPath());
865            wfBean.setConf(w.getConf());
866            wfBean.setGroup(w.getGroup());
867            wfBean.setRun(w.getRun());
868            wfBean.setUser(w.getUser());
869            wfBean.setAuthToken(w.getAuthToken());
870            wfBean.setCreatedTime(w.getCreatedTime());
871            wfBean.setEndTime(w.getEndTime());
872            wfBean.setExternalId(w.getExternalId());
873            wfBean.setLastModifiedTime(w.getLastModifiedTime());
874            wfBean.setLogToken(w.getLogToken());
875            wfBean.setProtoActionConf(w.getProtoActionConf());
876            wfBean.setSlaXml(w.getSlaXml());
877            wfBean.setStartTime(w.getStartTime());
878            wfBean.setStatus(w.getStatus());
879            wfBean.setWfInstance(w.getWfInstance());
880            return wfBean;
881        }
882    
883        private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) {
884    
885            WorkflowJobBean wfBean = new WorkflowJobBean();
886            wfBean.setId((String) arr[0]);
887            if (arr[1] != null) {
888                wfBean.setAppName((String) arr[1]);
889            }
890            if (arr[2] != null) {
891                wfBean.setStatus(Status.valueOf((String) arr[2]));
892            }
893            if (arr[3] != null) {
894                wfBean.setRun((Integer) arr[3]);
895            }
896            if (arr[4] != null) {
897                wfBean.setUser((String) arr[4]);
898            }
899            if (arr[5] != null) {
900                wfBean.setGroup((String) arr[5]);
901            }
902            if (arr[6] != null) {
903                wfBean.setCreatedTime((Timestamp) arr[6]);
904            }
905            if (arr[7] != null) {
906                wfBean.setStartTime((Timestamp) arr[7]);
907            }
908            if (arr[8] != null) {
909                wfBean.setLastModifiedTime((Timestamp) arr[8]);
910            }
911            if (arr[9] != null) {
912                wfBean.setEndTime((Timestamp) arr[9]);
913            }
914            return wfBean;
915        }
916    
917        private WorkflowActionBean getBeanForRunningAction(WorkflowActionBean a) throws SQLException {
918            if (a != null) {
919                WorkflowActionBean action = new WorkflowActionBean();
920                action.setId(a.getId());
921                action.setConf(a.getConf());
922                action.setConsoleUrl(a.getConsoleUrl());
923                action.setData(a.getData());
924                action.setStats(a.getStats());
925                action.setExternalChildIDs(a.getExternalChildIDs());
926                action.setErrorInfo(a.getErrorCode(), a.getErrorMessage());
927                action.setExternalId(a.getExternalId());
928                action.setExternalStatus(a.getExternalStatus());
929                action.setName(a.getName());
930                action.setCred(a.getCred());
931                action.setRetries(a.getRetries());
932                action.setTrackerUri(a.getTrackerUri());
933                action.setTransition(a.getTransition());
934                action.setType(a.getType());
935                action.setEndTime(a.getEndTime());
936                action.setExecutionPath(a.getExecutionPath());
937                action.setLastCheckTime(a.getLastCheckTime());
938                action.setLogToken(a.getLogToken());
939                if (a.getPending() == true) {
940                    action.setPending();
941                }
942                action.setPendingAge(a.getPendingAge());
943                action.setSignalValue(a.getSignalValue());
944                action.setSlaXml(a.getSlaXml());
945                action.setStartTime(a.getStartTime());
946                action.setStatus(a.getStatus());
947                action.setJobId(a.getWfId());
948                action.setUserRetryCount(a.getUserRetryCount());
949                action.setUserRetryInterval(a.getUserRetryInterval());
950                action.setUserRetryMax(a.getUserRetryMax());
951                return action;
952            }
953            return null;
954        }
955    
956        private void setWFQueryParameters(WorkflowJobBean wfBean, Query q) {
957            q.setParameter("appName", wfBean.getAppName());
958            q.setParameter("appPath", wfBean.getAppPath());
959            q.setParameter("conf", wfBean.getConf());
960            q.setParameter("groupName", wfBean.getGroup());
961            q.setParameter("run", wfBean.getRun());
962            q.setParameter("user", wfBean.getUser());
963            q.setParameter("authToken", wfBean.getAuthToken());
964            q.setParameter("createdTime", wfBean.getCreatedTimestamp());
965            q.setParameter("endTime", wfBean.getEndTimestamp());
966            q.setParameter("externalId", wfBean.getExternalId());
967            q.setParameter("lastModTime", new Date());
968            q.setParameter("logToken", wfBean.getLogToken());
969            q.setParameter("protoActionConf", wfBean.getProtoActionConf());
970            q.setParameter("slaXml", wfBean.getSlaXml());
971            q.setParameter("startTime", wfBean.getStartTimestamp());
972            q.setParameter("status", wfBean.getStatusStr());
973            q.setParameter("wfInstance", wfBean.getWfInstance());
974        }
975    
976        private void setActionQueryParameters(WorkflowActionBean aBean, Query q) {
977            q.setParameter("conf", aBean.getConf());
978            q.setParameter("consoleUrl", aBean.getConsoleUrl());
979            q.setParameter("data", aBean.getData());
980            q.setParameter("stats", aBean.getStats());
981            q.setParameter("externalChildIDs", aBean.getExternalChildIDs());
982            q.setParameter("errorCode", aBean.getErrorCode());
983            q.setParameter("errorMessage", aBean.getErrorMessage());
984            q.setParameter("externalId", aBean.getExternalId());
985            q.setParameter("externalStatus", aBean.getExternalStatus());
986            q.setParameter("name", aBean.getName());
987            q.setParameter("cred", aBean.getCred());
988            q.setParameter("retries", aBean.getRetries());
989            q.setParameter("trackerUri", aBean.getTrackerUri());
990            q.setParameter("transition", aBean.getTransition());
991            q.setParameter("type", aBean.getType());
992            q.setParameter("endTime", aBean.getEndTimestamp());
993            q.setParameter("executionPath", aBean.getExecutionPath());
994            q.setParameter("lastCheckTime", aBean.getLastCheckTimestamp());
995            q.setParameter("logToken", aBean.getLogToken());
996            q.setParameter("pending", aBean.isPending() ? 1 : 0);
997            q.setParameter("pendingAge", aBean.getPendingAgeTimestamp());
998            q.setParameter("signalValue", aBean.getSignalValue());
999            q.setParameter("slaXml", aBean.getSlaXml());
1000            q.setParameter("startTime", aBean.getStartTimestamp());
1001            q.setParameter("status", aBean.getStatusStr());
1002            q.setParameter("wfId", aBean.getWfId());
1003        }
1004    }