001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.store;
020
021import java.sql.Connection;
022import java.sql.SQLException;
023import java.sql.Timestamp;
024import java.util.ArrayList;
025import java.util.Date;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.Callable;
029
030import javax.persistence.EntityManager;
031import javax.persistence.Query;
032
033import org.apache.oozie.ErrorCode;
034import org.apache.oozie.WorkflowActionBean;
035import org.apache.oozie.WorkflowJobBean;
036import org.apache.oozie.WorkflowsInfo;
037import org.apache.oozie.client.OozieClient;
038import org.apache.oozie.client.WorkflowJob.Status;
039import org.apache.oozie.executor.jpa.JPAExecutorException;
040import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
041import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
042import org.apache.oozie.service.InstrumentationService;
043import org.apache.oozie.service.SchemaService;
044import org.apache.oozie.service.Services;
045import org.apache.oozie.service.SchemaService.SchemaName;
046import org.apache.oozie.util.Instrumentation;
047import org.apache.oozie.util.ParamChecker;
048import org.apache.oozie.util.XLog;
049import org.apache.oozie.workflow.WorkflowException;
050import org.apache.openjpa.persistence.OpenJPAEntityManager;
051import org.apache.openjpa.persistence.OpenJPAPersistence;
052import org.apache.openjpa.persistence.OpenJPAQuery;
053import org.apache.openjpa.persistence.jdbc.FetchDirection;
054import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
055import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
056import org.apache.openjpa.persistence.jdbc.ResultSetType;
057
058/**
059 * DB Implementation of Workflow Store
060 */
061public class WorkflowStore extends Store {
062    private Connection conn;
063    private EntityManager entityManager;
064    private boolean selectForUpdate;
065    private static final String INSTR_GROUP = "db";
066    public static final int LOCK_TIMEOUT = 50000;
067    private static final String seletStr = "Select w.id, w.appName, w.statusStr, w.run, w.user, w.group, w.createdTimestamp, "
068            + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp from WorkflowJobBean w";
069    private static final String countStr = "Select count(w) from WorkflowJobBean w";
070
071    public WorkflowStore() {
072    }
073
074    public WorkflowStore(Connection connection, boolean selectForUpdate) throws StoreException {
075        super();
076        conn = ParamChecker.notNull(connection, "conn");
077        entityManager = getEntityManager();
078        this.selectForUpdate = selectForUpdate;
079    }
080
081    public WorkflowStore(Connection connection, Store store, boolean selectForUpdate) throws StoreException {
082        super(store);
083        conn = ParamChecker.notNull(connection, "conn");
084        entityManager = getEntityManager();
085        this.selectForUpdate = selectForUpdate;
086    }
087
088    public WorkflowStore(boolean selectForUpdate) throws StoreException {
089        super();
090        entityManager = getEntityManager();
091        javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.WORKFLOW);
092        OpenJPAEntityManager kem = OpenJPAPersistence.cast(entityManager);
093        conn = (Connection) kem.getConnection();
094        this.selectForUpdate = selectForUpdate;
095    }
096
097    public WorkflowStore(Store store, boolean selectForUpdate) throws StoreException {
098        super(store);
099        entityManager = getEntityManager();
100        this.selectForUpdate = selectForUpdate;
101    }
102
103    /**
104     * Create a Workflow and return a WorkflowJobBean. It also creates the process instance for the job.
105     *
106     * @param workflow workflow bean
107     * @throws StoreException
108     */
109
110    public void insertWorkflow(final WorkflowJobBean workflow) throws StoreException {
111        ParamChecker.notNull(workflow, "workflow");
112
113        doOperation("insertWorkflow", new Callable<Void>() {
114            public Void call() throws SQLException, StoreException, WorkflowException {
115                entityManager.persist(workflow);
116                return null;
117            }
118        });
119    }
120
121    /**
122     * Load the Workflow into a Bean and return it. Also load the Workflow Instance into the bean. And lock the Workflow
123     * depending on the locking parameter.
124     *
125     * @param id Workflow ID
126     * @param locking true if Workflow is to be locked
127     * @return WorkflowJobBean
128     * @throws StoreException
129     */
130    public WorkflowJobBean getWorkflow(final String id, final boolean locking) throws StoreException {
131        ParamChecker.notEmpty(id, "WorkflowID");
132        WorkflowJobBean wfBean = doOperation("getWorkflow", new Callable<WorkflowJobBean>() {
133            public WorkflowJobBean call() throws SQLException, StoreException, WorkflowException, InterruptedException {
134                WorkflowJobBean wfBean = null;
135                wfBean = getWorkflowOnly(id, locking);
136                if (wfBean == null) {
137                    throw new StoreException(ErrorCode.E0604, id);
138                }
139                /*
140                 * WorkflowInstance wfInstance; //krishna and next line
141                 * wfInstance = workflowLib.get(id); wfInstance =
142                 * wfBean.get(wfBean.getWfInstance());
143                 * wfBean.setWorkflowInstance(wfInstance);
144                 * wfBean.setWfInstance(wfInstance);
145                 */
146                return wfBean;
147            }
148        });
149        return wfBean;
150    }
151
152    /**
153     * Get the number of Workflows with the given status.
154     *
155     * @param status Workflow Status.
156     * @return number of Workflows with given status.
157     * @throws StoreException
158     */
159    public int getWorkflowCountWithStatus(final String status) throws StoreException {
160        ParamChecker.notEmpty(status, "status");
161        Integer cnt = doOperation("getWorkflowCountWithStatus", new Callable<Integer>() {
162            public Integer call() throws SQLException {
163                Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS");
164                q.setParameter("status", status);
165                Long count = (Long) q.getSingleResult();
166                return Integer.valueOf(count.intValue());
167            }
168        });
169        return cnt.intValue();
170    }
171
172    /**
173     * Get the number of Workflows with the given status which was modified in given time limit.
174     *
175     * @param status Workflow Status.
176     * @param secs No. of seconds within which the workflow got modified.
177     * @return number of Workflows modified within given time with given status.
178     * @throws StoreException
179     */
180    public int getWorkflowCountWithStatusInLastNSeconds(final String status, final int secs) throws StoreException {
181        ParamChecker.notEmpty(status, "status");
182        ParamChecker.notEmpty(status, "secs");
183        Integer cnt = doOperation("getWorkflowCountWithStatusInLastNSecs", new Callable<Integer>() {
184            public Integer call() throws SQLException {
185                Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS_IN_LAST_N_SECS");
186                Timestamp ts = new Timestamp(System.currentTimeMillis() - (secs * 1000));
187                q.setParameter("status", status);
188                q.setParameter("lastModTime", ts);
189                Long count = (Long) q.getSingleResult();
190                return Integer.valueOf(count.intValue());
191            }
192        });
193        return cnt.intValue();
194    }
195
196    /**
197     * Update the data from Workflow Bean to DB along with the workflow instance data. Action table is not updated
198     *
199     * @param wfBean Workflow Bean
200     * @throws StoreException If Workflow doesn't exist
201     */
202    public void updateWorkflow(final WorkflowJobBean wfBean) throws StoreException {
203        ParamChecker.notNull(wfBean, "WorkflowJobBean");
204        doOperation("updateWorkflow", new Callable<Void>() {
205            public Void call() throws SQLException, StoreException, WorkflowException, JPAExecutorException {
206                WorkflowJobQueryExecutor.getInstance().executeUpdate(
207                        WorkflowJobQueryExecutor.WorkflowJobQuery.UPDATE_WORKFLOW, wfBean);
208                return null;
209            }
210        });
211    }
212
213    /**
214     * Create a new Action record in the ACTIONS table with the given Bean.
215     *
216     * @param action WorkflowActionBean
217     * @throws StoreException If the action is already present
218     */
219    public void insertAction(final WorkflowActionBean action) throws StoreException {
220        ParamChecker.notNull(action, "WorkflowActionBean");
221        doOperation("insertAction", new Callable<Void>() {
222            public Void call() throws SQLException, StoreException, WorkflowException {
223                entityManager.persist(action);
224                return null;
225            }
226        });
227    }
228
229    /**
230     * Load the action data and returns a bean.
231     *
232     * @param id Action Id
233     * @param locking true if the action is to be locked
234     * @return Action Bean
235     * @throws StoreException If action doesn't exist
236     */
237    public WorkflowActionBean getAction(final String id, final boolean locking) throws StoreException {
238        ParamChecker.notEmpty(id, "ActionID");
239        WorkflowActionBean action = doOperation("getAction", new Callable<WorkflowActionBean>() {
240            public WorkflowActionBean call() throws SQLException, StoreException, WorkflowException,
241                    InterruptedException {
242                Query q = entityManager.createNamedQuery("GET_ACTION");
243                /*
244                 * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q);
245                 * FetchPlan fetch = oq.getFetchPlan();
246                 * fetch.setReadLockMode(LockModeType.WRITE);
247                 * fetch.setLockTimeout(1000); // 1 seconds }
248                 */
249                WorkflowActionBean action = null;
250                q.setParameter("id", id);
251                List<WorkflowActionBean> actions = q.getResultList();
252                // action = (WorkflowActionBean) q.getSingleResult();
253                if (actions.size() > 0) {
254                    action = actions.get(0);
255                }
256                else {
257                    throw new StoreException(ErrorCode.E0605, id);
258                }
259
260                /*
261                 * if (locking) return action; else
262                 */
263                // return action;
264                return getBeanForRunningAction(action);
265            }
266        });
267        return action;
268    }
269
270    /**
271     * Update the given action bean to DB.
272     *
273     * @param action Action Bean
274     * @throws StoreException if action doesn't exist
275     */
276    public void updateAction(final WorkflowActionBean action) throws StoreException {
277        ParamChecker.notNull(action, "WorkflowActionBean");
278        doOperation("updateAction", new Callable<Void>() {
279            public Void call() throws SQLException, StoreException, WorkflowException, JPAExecutorException {
280                WorkflowActionQueryExecutor.getInstance().executeUpdate(
281                        WorkflowActionQueryExecutor.WorkflowActionQuery.UPDATE_ACTION, action);
282                return null;
283            }
284        });
285    }
286
287    /**
288     * Delete the Action with given id.
289     *
290     * @param id Action ID
291     * @throws StoreException if Action doesn't exist
292     */
293    public void deleteAction(final String id) throws StoreException {
294        ParamChecker.notEmpty(id, "ActionID");
295        doOperation("deleteAction", new Callable<Void>() {
296            public Void call() throws SQLException, StoreException, WorkflowException {
297                /*
298                 * Query q = entityManager.createNamedQuery("DELETE_ACTION");
299                 * q.setParameter("id", id); q.executeUpdate();
300                 */
301                WorkflowActionBean action = entityManager.find(WorkflowActionBean.class, id);
302                if (action != null) {
303                    entityManager.remove(action);
304                }
305                return null;
306            }
307        });
308    }
309
310    /**
311     * Loads all the actions for the given Workflow. Also locks all the actions if locking is true.
312     *
313     * @param wfId Workflow ID
314     * @param locking true if Actions are to be locked
315     * @return A List of WorkflowActionBean
316     * @throws StoreException
317     */
318    public List<WorkflowActionBean> getActionsForWorkflow(final String wfId, final boolean locking)
319            throws StoreException {
320        ParamChecker.notEmpty(wfId, "WorkflowID");
321        List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow",
322                                                       new Callable<List<WorkflowActionBean>>() {
323                                                           public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException,
324                                                                   InterruptedException {
325                                                               List<WorkflowActionBean> actions;
326                                                               List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>();
327                                                               try {
328                                                                   Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW");
329
330                                                                   /*
331                                                                   * OpenJPAQuery oq = OpenJPAPersistence.cast(q);
332                                                                   * if (locking) { //
333                                                                   * q.setHint("openjpa.FetchPlan.ReadLockMode"
334                                                                   * ,"WRITE"); FetchPlan fetch = oq.getFetchPlan();
335                                                                   * fetch.setReadLockMode(LockModeType.WRITE);
336                                                                   * fetch.setLockTimeout(1000); // 1 seconds }
337                                                                   */
338                                                                   q.setParameter("wfId", wfId);
339                                                                   actions = q.getResultList();
340                                                                   for (WorkflowActionBean a : actions) {
341                                                                       WorkflowActionBean aa = getBeanForRunningAction(a);
342                                                                       actionList.add(aa);
343                                                                   }
344                                                               }
345                                                               catch (IllegalStateException e) {
346                                                                   throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
347                                                               }
348                                                               /*
349                                                               * if (locking) { return actions; } else {
350                                                               */
351                                                               return actionList;
352                                                               // }
353                                                           }
354                                                       });
355        return actions;
356    }
357
358    /**
359     * Loads given number of actions for the given Workflow. Also locks all the actions if locking is true.
360     *
361     * @param wfId Workflow ID
362     * @param start offset for select statement
363     * @param len number of Workflow Actions to be returned
364     * @return A List of WorkflowActionBean
365     * @throws StoreException
366     */
367    public List<WorkflowActionBean> getActionsSubsetForWorkflow(final String wfId, final int start, final int len)
368            throws StoreException {
369        ParamChecker.notEmpty(wfId, "WorkflowID");
370        List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow",
371                                                       new Callable<List<WorkflowActionBean>>() {
372                                                           public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException,
373                                                                   InterruptedException {
374                                                               List<WorkflowActionBean> actions;
375                                                               List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>();
376                                                               try {
377                                                                   Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW");
378                                                                   OpenJPAQuery oq = OpenJPAPersistence.cast(q);
379                                                                   q.setParameter("wfId", wfId);
380                                                                   q.setFirstResult(start - 1);
381                                                                   q.setMaxResults(len);
382                                                                   actions = q.getResultList();
383                                                                   for (WorkflowActionBean a : actions) {
384                                                                       WorkflowActionBean aa = getBeanForRunningAction(a);
385                                                                       actionList.add(aa);
386                                                                   }
387                                                               }
388                                                               catch (IllegalStateException e) {
389                                                                   throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
390                                                               }
391                                                               return actionList;
392                                                           }
393                                                       });
394        return actions;
395    }
396
397    /**
398     * Load All the actions that are pending for more than given time.
399     *
400     * @param minimumPendingAgeSecs Minimum Pending age in seconds
401     * @return List of action beans
402     * @throws StoreException
403     */
404    public List<WorkflowActionBean> getPendingActions(final long minimumPendingAgeSecs) throws StoreException {
405        List<WorkflowActionBean> actions = doOperation("getPendingActions", new Callable<List<WorkflowActionBean>>() {
406            public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
407                Timestamp ts = new Timestamp(System.currentTimeMillis() - minimumPendingAgeSecs * 1000);
408                List<WorkflowActionBean> actionList = null;
409                try {
410                    Query q = entityManager.createNamedQuery("GET_PENDING_ACTIONS");
411                    q.setParameter("pendingAge", ts);
412                    actionList = q.getResultList();
413                }
414                catch (IllegalStateException e) {
415                    throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
416                }
417                return actionList;
418            }
419        });
420        return actions;
421    }
422
423    /**
424     * Load All the actions that are running and were last checked after now - miminumCheckAgeSecs
425     *
426     * @param checkAgeSecs check age in seconds.
427     * @return List of action beans.
428     * @throws StoreException
429     */
430    public List<WorkflowActionBean> getRunningActions(final long checkAgeSecs) throws StoreException {
431        List<WorkflowActionBean> actions = doOperation("getRunningActions", new Callable<List<WorkflowActionBean>>() {
432
433            public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
434                List<WorkflowActionBean> actions = new ArrayList<WorkflowActionBean>();
435                Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
436                try {
437                    Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS");
438                    q.setParameter("lastCheckTime", ts);
439                    actions = q.getResultList();
440                }
441                catch (IllegalStateException e) {
442                    throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
443                }
444
445                return actions;
446            }
447        });
448        return actions;
449    }
450
451    /**
452     * Load All the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL.
453     *
454     * @param wfId String
455     * @return List of action beans
456     * @throws StoreException
457     */
458    public List<WorkflowActionBean> getRetryAndManualActions(final String wfId) throws StoreException {
459        List<WorkflowActionBean> actions = doOperation("GET_RETRY_MANUAL_ACTIONS",
460                new Callable<List<WorkflowActionBean>>() {
461                    public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
462                        List<WorkflowActionBean> actionList = null;
463                        try {
464                            Query q = entityManager.createNamedQuery("GET_RETRY_MANUAL_ACTIONS");
465                            q.setParameter("wfId", wfId);
466                            actionList = q.getResultList();
467                        }
468                        catch (IllegalStateException e) {
469                            throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
470                        }
471
472                        return actionList;
473                    }
474                });
475        return actions;
476    }
477
478    /**
479     * Loads all the jobs that are satisfying the given filter condition. Filters can be applied on user, group,
480     * appName, status.
481     *
482     * @param filter Filter condition
483     * @param start offset for select statement
484     * @param len number of Workflows to be returned
485     * @return A list of workflows
486     * @throws StoreException
487     */
488    public WorkflowsInfo getWorkflowsInfo(final Map<String, List<String>> filter, final int start, final int len)
489            throws StoreException {
490
491        WorkflowsInfo workFlowsInfo = doOperation("getWorkflowsInfo", new Callable<WorkflowsInfo>() {
492            @SuppressWarnings("unchecked")
493            public WorkflowsInfo call() throws SQLException, StoreException {
494
495                List<String> orArray = new ArrayList<String>();
496                List<String> colArray = new ArrayList<String>();
497                List<String> valArray = new ArrayList<String>();
498                StringBuilder sb = new StringBuilder("");
499                boolean isStatus = false;
500                boolean isGroup = false;
501                boolean isAppName = false;
502                boolean isUser = false;
503                boolean isEnabled = false;
504                int index = 0;
505                for (Map.Entry<String, List<String>> entry : filter.entrySet()) {
506                    String colName = null;
507                    String colVar = null;
508                    if (entry.getKey().equals(OozieClient.FILTER_GROUP)) {
509                        List<String> values = filter.get(OozieClient.FILTER_GROUP);
510                        colName = "group";
511                        for (int i = 0; i < values.size(); i++) {
512                            colVar = "group";
513                            colVar = colVar + index;
514                            if (!isEnabled && !isGroup) {
515                                sb.append(seletStr).append(" where w.group IN (:group" + index);
516                                isGroup = true;
517                                isEnabled = true;
518                            }
519                            else {
520                                if (isEnabled && !isGroup) {
521                                    sb.append(" and w.group IN (:group" + index);
522                                    isGroup = true;
523                                }
524                                else {
525                                    if (isGroup) {
526                                        sb.append(", :group" + index);
527                                    }
528                                }
529                            }
530                            if (i == values.size() - 1) {
531                                sb.append(")");
532                            }
533                            index++;
534                            valArray.add(values.get(i));
535                            orArray.add(colName);
536                            colArray.add(colVar);
537                        }
538                    }
539                    else {
540                        if (entry.getKey().equals(OozieClient.FILTER_STATUS)) {
541                            List<String> values = filter.get(OozieClient.FILTER_STATUS);
542                            colName = "status";
543                            for (int i = 0; i < values.size(); i++) {
544                                colVar = "status";
545                                colVar = colVar + index;
546                                if (!isEnabled && !isStatus) {
547                                    sb.append(seletStr).append(" where w.statusStr IN (:status" + index);
548                                    isStatus = true;
549                                    isEnabled = true;
550                                }
551                                else {
552                                    if (isEnabled && !isStatus) {
553                                        sb.append(" and w.statusStr IN (:status" + index);
554                                        isStatus = true;
555                                    }
556                                    else {
557                                        if (isStatus) {
558                                            sb.append(", :status" + index);
559                                        }
560                                    }
561                                }
562                                if (i == values.size() - 1) {
563                                    sb.append(")");
564                                }
565                                index++;
566                                valArray.add(values.get(i));
567                                orArray.add(colName);
568                                colArray.add(colVar);
569                            }
570                        }
571                        else {
572                            if (entry.getKey().equals(OozieClient.FILTER_NAME)) {
573                                List<String> values = filter.get(OozieClient.FILTER_NAME);
574                                colName = "appName";
575                                for (int i = 0; i < values.size(); i++) {
576                                    colVar = "appName";
577                                    colVar = colVar + index;
578                                    if (!isEnabled && !isAppName) {
579                                        sb.append(seletStr).append(" where w.appName IN (:appName" + index);
580                                        isAppName = true;
581                                        isEnabled = true;
582                                    }
583                                    else {
584                                        if (isEnabled && !isAppName) {
585                                            sb.append(" and w.appName IN (:appName" + index);
586                                            isAppName = true;
587                                        }
588                                        else {
589                                            if (isAppName) {
590                                                sb.append(", :appName" + index);
591                                            }
592                                        }
593                                    }
594                                    if (i == values.size() - 1) {
595                                        sb.append(")");
596                                    }
597                                    index++;
598                                    valArray.add(values.get(i));
599                                    orArray.add(colName);
600                                    colArray.add(colVar);
601                                }
602                            }
603                            else {
604                                if (entry.getKey().equals(OozieClient.FILTER_USER)) {
605                                    List<String> values = filter.get(OozieClient.FILTER_USER);
606                                    colName = "user";
607                                    for (int i = 0; i < values.size(); i++) {
608                                        colVar = "user";
609                                        colVar = colVar + index;
610                                        if (!isEnabled && !isUser) {
611                                            sb.append(seletStr).append(" where w.user IN (:user" + index);
612                                            isUser = true;
613                                            isEnabled = true;
614                                        }
615                                        else {
616                                            if (isEnabled && !isUser) {
617                                                sb.append(" and w.user IN (:user" + index);
618                                                isUser = true;
619                                            }
620                                            else {
621                                                if (isUser) {
622                                                    sb.append(", :user" + index);
623                                                }
624                                            }
625                                        }
626                                        if (i == values.size() - 1) {
627                                            sb.append(")");
628                                        }
629                                        index++;
630                                        valArray.add(values.get(i));
631                                        orArray.add(colName);
632                                        colArray.add(colVar);
633                                    }
634                                }
635                            }
636                        }
637                    }
638                }
639
640                int realLen = 0;
641
642                Query q = null;
643                Query qTotal = null;
644                if (orArray.size() == 0) {
645                    q = entityManager.createNamedQuery("GET_WORKFLOWS_COLUMNS");
646                    q.setFirstResult(start - 1);
647                    q.setMaxResults(len);
648                    qTotal = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT");
649                }
650                else {
651                    if (orArray.size() > 0) {
652                        StringBuilder sbTotal = new StringBuilder(sb);
653                        sb.append(" order by w.startTimestamp desc ");
654                        XLog.getLog(getClass()).debug("Created String is **** " + sb.toString());
655                        q = entityManager.createQuery(sb.toString());
656                        q.setFirstResult(start - 1);
657                        q.setMaxResults(len);
658                        qTotal = entityManager.createQuery(sbTotal.toString().replace(seletStr, countStr));
659                        for (int i = 0; i < orArray.size(); i++) {
660                            q.setParameter(colArray.get(i), valArray.get(i));
661                            qTotal.setParameter(colArray.get(i), valArray.get(i));
662                        }
663                    }
664                }
665
666                OpenJPAQuery kq = OpenJPAPersistence.cast(q);
667                JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
668                fetch.setFetchBatchSize(20);
669                fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
670                fetch.setFetchDirection(FetchDirection.FORWARD);
671                fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
672                List<?> resultList = q.getResultList();
673                List<Object[]> objectArrList = (List<Object[]>) resultList;
674                List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>();
675
676                for (Object[] arr : objectArrList) {
677                    WorkflowJobBean ww = getBeanForWorkflowFromArray(arr);
678                    wfBeansList.add(ww);
679                }
680
681                realLen = ((Long) qTotal.getSingleResult()).intValue();
682
683                return new WorkflowsInfo(wfBeansList, start, len, realLen);
684            }
685        });
686        return workFlowsInfo;
687
688    }
689
690    /**
691     * Load the Workflow and all Action details and return a WorkflowJobBean. Workflow Instance is not loaded
692     *
693     * @param id Workflow Id
694     * @return Workflow Bean
695     * @throws StoreException If Workflow doesn't exist
696     */
697    public WorkflowJobBean getWorkflowInfo(final String id) throws StoreException {
698        ParamChecker.notEmpty(id, "WorkflowID");
699        WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() {
700            public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException {
701                WorkflowJobBean wfBean = null;
702                wfBean = getWorkflowforInfo(id, false);
703                if (wfBean == null) {
704                    throw new StoreException(ErrorCode.E0604, id);
705                }
706                else {
707                    wfBean.setActions(getActionsForWorkflow(id, false));
708                }
709                return wfBean;
710            }
711        });
712        return wfBean;
713    }
714
715    /**
716     * Load the Workflow and subset Actions details and return a WorkflowJobBean. Workflow Instance is not loaded
717     *
718     * @param id Workflow Id
719     * @param start offset for select statement for actions
720     * @param len number of Workflow Actions to be returned
721     * @return Workflow Bean
722     * @throws StoreException If Workflow doesn't exist
723     */
724    public WorkflowJobBean getWorkflowInfoWithActionsSubset(final String id, final int start, final int len) throws StoreException {
725        ParamChecker.notEmpty(id, "WorkflowID");
726        WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() {
727            public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException {
728                WorkflowJobBean wfBean = null;
729                wfBean = getWorkflowforInfo(id, false);
730                if (wfBean == null) {
731                    throw new StoreException(ErrorCode.E0604, id);
732                }
733                else {
734                    wfBean.setActions(getActionsSubsetForWorkflow(id, start, len));
735                }
736                return wfBean;
737            }
738        });
739        return wfBean;
740    }
741
742    /**
743     * Get the Workflow ID with given external ID which will be assigned for the subworkflows.
744     *
745     * @param externalId external ID
746     * @return Workflow ID
747     * @throws StoreException if there is no job with external ID
748     */
749    public String getWorkflowIdForExternalId(final String externalId) throws StoreException {
750        ParamChecker.notEmpty(externalId, "externalId");
751        String wfId = doOperation("getWorkflowIdForExternalId", new Callable<String>() {
752            public String call() throws SQLException, StoreException {
753                String id = "";
754                Query q = entityManager.createNamedQuery("GET_WORKFLOW_ID_FOR_EXTERNAL_ID");
755                q.setParameter("externalId", externalId);
756                List<String> w = q.getResultList();
757                if (w.size() == 0) {
758                    id = "";
759                }
760                else {
761                    int index = w.size() - 1;
762                    id = w.get(index);
763                }
764                return id;
765            }
766        });
767        return wfId;
768    }
769
770    private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
771
772    /**
773     * Purge the Workflows Completed older than given days.
774     *
775     * @param olderThanDays number of days for which to preserve the workflows
776     * @throws StoreException
777     */
778    public void purge(final long olderThanDays, final int limit) throws StoreException {
779        doOperation("purge", new Callable<Void>() {
780            public Void call() throws SQLException, StoreException, WorkflowException {
781                Timestamp maxEndTime = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS));
782                Query q = entityManager.createNamedQuery("GET_COMPLETED_WORKFLOWS_OLDER_THAN");
783                q.setParameter("endTime", maxEndTime);
784                q.setMaxResults(limit);
785                List<WorkflowJobBean> workflows = q.getResultList();
786                int actionDeleted = 0;
787                if (workflows.size() != 0) {
788                    for (WorkflowJobBean w : workflows) {
789                        String wfId = w.getId();
790                        entityManager.remove(w);
791                        Query g = entityManager.createNamedQuery("DELETE_ACTIONS_FOR_WORKFLOW");
792                        g.setParameter("wfId", wfId);
793                        actionDeleted += g.executeUpdate();
794                    }
795                }
796                XLog.getLog(getClass()).debug("ENDED Workflow Purge deleted jobs :" + workflows.size() + " and actions " + actionDeleted);
797                return null;
798            }
799        });
800    }
801
802    private <V> V doOperation(String name, Callable<V> command) throws StoreException {
803        try {
804            Instrumentation.Cron cron = new Instrumentation.Cron();
805            cron.start();
806            V retVal;
807            try {
808                retVal = command.call();
809            }
810            finally {
811                cron.stop();
812            }
813            Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron);
814            return retVal;
815        }
816        catch (StoreException ex) {
817            throw ex;
818        }
819        catch (SQLException ex) {
820            throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex);
821        }
822        catch (Exception e) {
823            throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e);
824        }
825    }
826
827    private WorkflowJobBean getWorkflowOnly(final String id, boolean locking) throws SQLException,
828            InterruptedException, StoreException {
829        WorkflowJobBean wfBean = null;
830        Query q = entityManager.createNamedQuery("GET_WORKFLOW");
831        /*
832         * if (locking) { // q.setHint("openjpa.FetchPlan.ReadLockMode","READ");
833         * OpenJPAQuery oq = OpenJPAPersistence.cast(q); FetchPlan fetch =
834         * oq.getFetchPlan(); fetch.setReadLockMode(LockModeType.WRITE);
835         * fetch.setLockTimeout(-1); // unlimited }
836         */
837        q.setParameter("id", id);
838        List<WorkflowJobBean> w = q.getResultList();
839        if (w.size() > 0) {
840            wfBean = w.get(0);
841        }
842        return wfBean;
843        // return getBeanForRunningWorkflow(wfBean);
844    }
845
846    private WorkflowJobBean getWorkflowforInfo(final String id, boolean locking) throws SQLException,
847            InterruptedException, StoreException {
848        WorkflowJobBean wfBean = null;
849        Query q = entityManager.createNamedQuery("GET_WORKFLOW");
850        q.setParameter("id", id);
851        List<WorkflowJobBean> w = q.getResultList();
852        if (w.size() > 0) {
853            wfBean = w.get(0);
854            return getBeanForRunningWorkflow(wfBean);
855        }
856        return null;
857    }
858
859    private WorkflowJobBean getBeanForRunningWorkflow(WorkflowJobBean w) throws SQLException {
860        WorkflowJobBean wfBean = new WorkflowJobBean();
861        wfBean.setId(w.getId());
862        wfBean.setAppName(w.getAppName());
863        wfBean.setAppPath(w.getAppPath());
864        wfBean.setConfBlob(w.getConfBlob());
865        wfBean.setGroup(w.getGroup());
866        wfBean.setRun(w.getRun());
867        wfBean.setUser(w.getUser());
868        wfBean.setCreatedTime(w.getCreatedTime());
869        wfBean.setEndTime(w.getEndTime());
870        wfBean.setExternalId(w.getExternalId());
871        wfBean.setLastModifiedTime(w.getLastModifiedTime());
872        wfBean.setLogToken(w.getLogToken());
873        wfBean.setProtoActionConfBlob(w.getProtoActionConfBlob());
874        wfBean.setSlaXmlBlob(w.getSlaXmlBlob());
875        wfBean.setStartTime(w.getStartTime());
876        wfBean.setStatus(w.getStatus());
877        wfBean.setWfInstanceBlob(w.getWfInstanceBlob());
878        return wfBean;
879    }
880
881    private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) {
882
883        WorkflowJobBean wfBean = new WorkflowJobBean();
884        wfBean.setId((String) arr[0]);
885        if (arr[1] != null) {
886            wfBean.setAppName((String) arr[1]);
887        }
888        if (arr[2] != null) {
889            wfBean.setStatus(Status.valueOf((String) arr[2]));
890        }
891        if (arr[3] != null) {
892            wfBean.setRun((Integer) arr[3]);
893        }
894        if (arr[4] != null) {
895            wfBean.setUser((String) arr[4]);
896        }
897        if (arr[5] != null) {
898            wfBean.setGroup((String) arr[5]);
899        }
900        if (arr[6] != null) {
901            wfBean.setCreatedTime((Timestamp) arr[6]);
902        }
903        if (arr[7] != null) {
904            wfBean.setStartTime((Timestamp) arr[7]);
905        }
906        if (arr[8] != null) {
907            wfBean.setLastModifiedTime((Timestamp) arr[8]);
908        }
909        if (arr[9] != null) {
910            wfBean.setEndTime((Timestamp) arr[9]);
911        }
912        return wfBean;
913    }
914
915    private WorkflowActionBean getBeanForRunningAction(WorkflowActionBean a) throws SQLException {
916        if (a != null) {
917            WorkflowActionBean action = new WorkflowActionBean();
918            action.setId(a.getId());
919            action.setConfBlob(a.getConfBlob());
920            action.setConsoleUrl(a.getConsoleUrl());
921            action.setDataBlob(a.getDataBlob());
922            action.setStatsBlob(a.getStatsBlob());
923            action.setExternalChildIDsBlob(a.getExternalChildIDsBlob());
924            action.setErrorInfo(a.getErrorCode(), a.getErrorMessage());
925            action.setExternalId(a.getExternalId());
926            action.setExternalStatus(a.getExternalStatus());
927            action.setName(a.getName());
928            action.setCred(a.getCred());
929            action.setRetries(a.getRetries());
930            action.setTrackerUri(a.getTrackerUri());
931            action.setTransition(a.getTransition());
932            action.setType(a.getType());
933            action.setEndTime(a.getEndTime());
934            action.setExecutionPath(a.getExecutionPath());
935            action.setLastCheckTime(a.getLastCheckTime());
936            action.setLogToken(a.getLogToken());
937            if (a.isPending() == true) {
938                action.setPending();
939            }
940            action.setPendingAge(a.getPendingAge());
941            action.setSignalValue(a.getSignalValue());
942            action.setSlaXmlBlob(a.getSlaXmlBlob());
943            action.setStartTime(a.getStartTime());
944            action.setStatus(a.getStatus());
945            action.setJobId(a.getWfId());
946            action.setUserRetryCount(a.getUserRetryCount());
947            action.setUserRetryInterval(a.getUserRetryInterval());
948            action.setUserRetryMax(a.getUserRetryMax());
949            return action;
950        }
951        return null;
952    }
953}