001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.store;
019
020import java.sql.Connection;
021import java.sql.SQLException;
022import java.sql.Timestamp;
023import java.util.ArrayList;
024import java.util.Date;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.Callable;
028
029import javax.persistence.EntityManager;
030import javax.persistence.Query;
031
032import org.apache.oozie.ErrorCode;
033import org.apache.oozie.WorkflowActionBean;
034import org.apache.oozie.WorkflowJobBean;
035import org.apache.oozie.WorkflowsInfo;
036import org.apache.oozie.client.OozieClient;
037import org.apache.oozie.client.WorkflowJob.Status;
038import org.apache.oozie.executor.jpa.JPAExecutorException;
039import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
040import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
041import org.apache.oozie.service.InstrumentationService;
042import org.apache.oozie.service.SchemaService;
043import org.apache.oozie.service.Services;
044import org.apache.oozie.service.SchemaService.SchemaName;
045import org.apache.oozie.util.Instrumentation;
046import org.apache.oozie.util.ParamChecker;
047import org.apache.oozie.util.XLog;
048import org.apache.oozie.workflow.WorkflowException;
049import org.apache.openjpa.persistence.OpenJPAEntityManager;
050import org.apache.openjpa.persistence.OpenJPAPersistence;
051import org.apache.openjpa.persistence.OpenJPAQuery;
052import org.apache.openjpa.persistence.jdbc.FetchDirection;
053import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
054import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
055import org.apache.openjpa.persistence.jdbc.ResultSetType;
056
057/**
058 * DB Implementation of Workflow Store
059 */
060public class WorkflowStore extends Store {
061    private Connection conn;
062    private EntityManager entityManager;
063    private boolean selectForUpdate;
064    private static final String INSTR_GROUP = "db";
065    public static final int LOCK_TIMEOUT = 50000;
066    private static final String seletStr = "Select w.id, w.appName, w.statusStr, w.run, w.user, w.group, w.createdTimestamp, "
067            + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp from WorkflowJobBean w";
068    private static final String countStr = "Select count(w) from WorkflowJobBean w";
069
070    public WorkflowStore() {
071    }
072
073    public WorkflowStore(Connection connection, boolean selectForUpdate) throws StoreException {
074        super();
075        conn = ParamChecker.notNull(connection, "conn");
076        entityManager = getEntityManager();
077        this.selectForUpdate = selectForUpdate;
078    }
079
080    public WorkflowStore(Connection connection, Store store, boolean selectForUpdate) throws StoreException {
081        super(store);
082        conn = ParamChecker.notNull(connection, "conn");
083        entityManager = getEntityManager();
084        this.selectForUpdate = selectForUpdate;
085    }
086
087    public WorkflowStore(boolean selectForUpdate) throws StoreException {
088        super();
089        entityManager = getEntityManager();
090        javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.WORKFLOW);
091        OpenJPAEntityManager kem = OpenJPAPersistence.cast(entityManager);
092        conn = (Connection) kem.getConnection();
093        this.selectForUpdate = selectForUpdate;
094    }
095
096    public WorkflowStore(Store store, boolean selectForUpdate) throws StoreException {
097        super(store);
098        entityManager = getEntityManager();
099        this.selectForUpdate = selectForUpdate;
100    }
101
102    /**
103     * Create a Workflow and return a WorkflowJobBean. It also creates the process instance for the job.
104     *
105     * @param workflow workflow bean
106     * @throws StoreException
107     */
108
109    public void insertWorkflow(final WorkflowJobBean workflow) throws StoreException {
110        ParamChecker.notNull(workflow, "workflow");
111
112        doOperation("insertWorkflow", new Callable<Void>() {
113            public Void call() throws SQLException, StoreException, WorkflowException {
114                entityManager.persist(workflow);
115                return null;
116            }
117        });
118    }
119
120    /**
121     * Load the Workflow into a Bean and return it. Also load the Workflow Instance into the bean. And lock the Workflow
122     * depending on the locking parameter.
123     *
124     * @param id Workflow ID
125     * @param locking true if Workflow is to be locked
126     * @return WorkflowJobBean
127     * @throws StoreException
128     */
129    public WorkflowJobBean getWorkflow(final String id, final boolean locking) throws StoreException {
130        ParamChecker.notEmpty(id, "WorkflowID");
131        WorkflowJobBean wfBean = doOperation("getWorkflow", new Callable<WorkflowJobBean>() {
132            public WorkflowJobBean call() throws SQLException, StoreException, WorkflowException, InterruptedException {
133                WorkflowJobBean wfBean = null;
134                wfBean = getWorkflowOnly(id, locking);
135                if (wfBean == null) {
136                    throw new StoreException(ErrorCode.E0604, id);
137                }
138                /*
139                 * WorkflowInstance wfInstance; //krishna and next line
140                 * wfInstance = workflowLib.get(id); wfInstance =
141                 * wfBean.get(wfBean.getWfInstance());
142                 * wfBean.setWorkflowInstance(wfInstance);
143                 * wfBean.setWfInstance(wfInstance);
144                 */
145                return wfBean;
146            }
147        });
148        return wfBean;
149    }
150
151    /**
152     * Get the number of Workflows with the given status.
153     *
154     * @param status Workflow Status.
155     * @return number of Workflows with given status.
156     * @throws StoreException
157     */
158    public int getWorkflowCountWithStatus(final String status) throws StoreException {
159        ParamChecker.notEmpty(status, "status");
160        Integer cnt = doOperation("getWorkflowCountWithStatus", new Callable<Integer>() {
161            public Integer call() throws SQLException {
162                Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS");
163                q.setParameter("status", status);
164                Long count = (Long) q.getSingleResult();
165                return Integer.valueOf(count.intValue());
166            }
167        });
168        return cnt.intValue();
169    }
170
171    /**
172     * Get the number of Workflows with the given status which was modified in given time limit.
173     *
174     * @param status Workflow Status.
175     * @param secs No. of seconds within which the workflow got modified.
176     * @return number of Workflows modified within given time with given status.
177     * @throws StoreException
178     */
179    public int getWorkflowCountWithStatusInLastNSeconds(final String status, final int secs) throws StoreException {
180        ParamChecker.notEmpty(status, "status");
181        ParamChecker.notEmpty(status, "secs");
182        Integer cnt = doOperation("getWorkflowCountWithStatusInLastNSecs", new Callable<Integer>() {
183            public Integer call() throws SQLException {
184                Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS_IN_LAST_N_SECS");
185                Timestamp ts = new Timestamp(System.currentTimeMillis() - (secs * 1000));
186                q.setParameter("status", status);
187                q.setParameter("lastModTime", ts);
188                Long count = (Long) q.getSingleResult();
189                return Integer.valueOf(count.intValue());
190            }
191        });
192        return cnt.intValue();
193    }
194
195    /**
196     * Update the data from Workflow Bean to DB along with the workflow instance data. Action table is not updated
197     *
198     * @param wfBean Workflow Bean
199     * @throws StoreException If Workflow doesn't exist
200     */
201    public void updateWorkflow(final WorkflowJobBean wfBean) throws StoreException {
202        ParamChecker.notNull(wfBean, "WorkflowJobBean");
203        doOperation("updateWorkflow", new Callable<Void>() {
204            public Void call() throws SQLException, StoreException, WorkflowException, JPAExecutorException {
205                WorkflowJobQueryExecutor.getInstance().executeUpdate(
206                        WorkflowJobQueryExecutor.WorkflowJobQuery.UPDATE_WORKFLOW, wfBean);
207                return null;
208            }
209        });
210    }
211
212    /**
213     * Create a new Action record in the ACTIONS table with the given Bean.
214     *
215     * @param action WorkflowActionBean
216     * @throws StoreException If the action is already present
217     */
218    public void insertAction(final WorkflowActionBean action) throws StoreException {
219        ParamChecker.notNull(action, "WorkflowActionBean");
220        doOperation("insertAction", new Callable<Void>() {
221            public Void call() throws SQLException, StoreException, WorkflowException {
222                entityManager.persist(action);
223                return null;
224            }
225        });
226    }
227
228    /**
229     * Load the action data and returns a bean.
230     *
231     * @param id Action Id
232     * @param locking true if the action is to be locked
233     * @return Action Bean
234     * @throws StoreException If action doesn't exist
235     */
236    public WorkflowActionBean getAction(final String id, final boolean locking) throws StoreException {
237        ParamChecker.notEmpty(id, "ActionID");
238        WorkflowActionBean action = doOperation("getAction", new Callable<WorkflowActionBean>() {
239            public WorkflowActionBean call() throws SQLException, StoreException, WorkflowException,
240                    InterruptedException {
241                Query q = entityManager.createNamedQuery("GET_ACTION");
242                /*
243                 * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q);
244                 * FetchPlan fetch = oq.getFetchPlan();
245                 * fetch.setReadLockMode(LockModeType.WRITE);
246                 * fetch.setLockTimeout(1000); // 1 seconds }
247                 */
248                WorkflowActionBean action = null;
249                q.setParameter("id", id);
250                List<WorkflowActionBean> actions = q.getResultList();
251                // action = (WorkflowActionBean) q.getSingleResult();
252                if (actions.size() > 0) {
253                    action = actions.get(0);
254                }
255                else {
256                    throw new StoreException(ErrorCode.E0605, id);
257                }
258
259                /*
260                 * if (locking) return action; else
261                 */
262                // return action;
263                return getBeanForRunningAction(action);
264            }
265        });
266        return action;
267    }
268
269    /**
270     * Update the given action bean to DB.
271     *
272     * @param action Action Bean
273     * @throws StoreException if action doesn't exist
274     */
275    public void updateAction(final WorkflowActionBean action) throws StoreException {
276        ParamChecker.notNull(action, "WorkflowActionBean");
277        doOperation("updateAction", new Callable<Void>() {
278            public Void call() throws SQLException, StoreException, WorkflowException, JPAExecutorException {
279                WorkflowActionQueryExecutor.getInstance().executeUpdate(
280                        WorkflowActionQueryExecutor.WorkflowActionQuery.UPDATE_ACTION, action);
281                return null;
282            }
283        });
284    }
285
286    /**
287     * Delete the Action with given id.
288     *
289     * @param id Action ID
290     * @throws StoreException if Action doesn't exist
291     */
292    public void deleteAction(final String id) throws StoreException {
293        ParamChecker.notEmpty(id, "ActionID");
294        doOperation("deleteAction", new Callable<Void>() {
295            public Void call() throws SQLException, StoreException, WorkflowException {
296                /*
297                 * Query q = entityManager.createNamedQuery("DELETE_ACTION");
298                 * q.setParameter("id", id); q.executeUpdate();
299                 */
300                WorkflowActionBean action = entityManager.find(WorkflowActionBean.class, id);
301                if (action != null) {
302                    entityManager.remove(action);
303                }
304                return null;
305            }
306        });
307    }
308
309    /**
310     * Loads all the actions for the given Workflow. Also locks all the actions if locking is true.
311     *
312     * @param wfId Workflow ID
313     * @param locking true if Actions are to be locked
314     * @return A List of WorkflowActionBean
315     * @throws StoreException
316     */
317    public List<WorkflowActionBean> getActionsForWorkflow(final String wfId, final boolean locking)
318            throws StoreException {
319        ParamChecker.notEmpty(wfId, "WorkflowID");
320        List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow",
321                                                       new Callable<List<WorkflowActionBean>>() {
322                                                           public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException,
323                                                                   InterruptedException {
324                                                               List<WorkflowActionBean> actions;
325                                                               List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>();
326                                                               try {
327                                                                   Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW");
328
329                                                                   /*
330                                                                   * OpenJPAQuery oq = OpenJPAPersistence.cast(q);
331                                                                   * if (locking) { //
332                                                                   * q.setHint("openjpa.FetchPlan.ReadLockMode"
333                                                                   * ,"WRITE"); FetchPlan fetch = oq.getFetchPlan();
334                                                                   * fetch.setReadLockMode(LockModeType.WRITE);
335                                                                   * fetch.setLockTimeout(1000); // 1 seconds }
336                                                                   */
337                                                                   q.setParameter("wfId", wfId);
338                                                                   actions = q.getResultList();
339                                                                   for (WorkflowActionBean a : actions) {
340                                                                       WorkflowActionBean aa = getBeanForRunningAction(a);
341                                                                       actionList.add(aa);
342                                                                   }
343                                                               }
344                                                               catch (IllegalStateException e) {
345                                                                   throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
346                                                               }
347                                                               /*
348                                                               * if (locking) { return actions; } else {
349                                                               */
350                                                               return actionList;
351                                                               // }
352                                                           }
353                                                       });
354        return actions;
355    }
356
357    /**
358     * Loads given number of actions for the given Workflow. Also locks all the actions if locking is true.
359     *
360     * @param wfId Workflow ID
361     * @param start offset for select statement
362     * @param len number of Workflow Actions to be returned
363     * @param locking true if Actions are to be locked
364     * @return A List of WorkflowActionBean
365     * @throws StoreException
366     */
367    public List<WorkflowActionBean> getActionsSubsetForWorkflow(final String wfId, final int start, final int len)
368            throws StoreException {
369        ParamChecker.notEmpty(wfId, "WorkflowID");
370        List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow",
371                                                       new Callable<List<WorkflowActionBean>>() {
372                                                           public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException,
373                                                                   InterruptedException {
374                                                               List<WorkflowActionBean> actions;
375                                                               List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>();
376                                                               try {
377                                                                   Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW");
378                                                                   OpenJPAQuery oq = OpenJPAPersistence.cast(q);
379                                                                   q.setParameter("wfId", wfId);
380                                                                   q.setFirstResult(start - 1);
381                                                                   q.setMaxResults(len);
382                                                                   actions = q.getResultList();
383                                                                   for (WorkflowActionBean a : actions) {
384                                                                       WorkflowActionBean aa = getBeanForRunningAction(a);
385                                                                       actionList.add(aa);
386                                                                   }
387                                                               }
388                                                               catch (IllegalStateException e) {
389                                                                   throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
390                                                               }
391                                                               return actionList;
392                                                           }
393                                                       });
394        return actions;
395    }
396
397    /**
398     * Load All the actions that are pending for more than given time.
399     *
400     * @param minimumPendingAgeSecs Minimum Pending age in seconds
401     * @return List of action beans
402     * @throws StoreException
403     */
404    public List<WorkflowActionBean> getPendingActions(final long minimumPendingAgeSecs) throws StoreException {
405        List<WorkflowActionBean> actions = doOperation("getPendingActions", new Callable<List<WorkflowActionBean>>() {
406            public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
407                Timestamp ts = new Timestamp(System.currentTimeMillis() - minimumPendingAgeSecs * 1000);
408                List<WorkflowActionBean> actionList = null;
409                try {
410                    Query q = entityManager.createNamedQuery("GET_PENDING_ACTIONS");
411                    q.setParameter("pendingAge", ts);
412                    actionList = q.getResultList();
413                }
414                catch (IllegalStateException e) {
415                    throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
416                }
417                return actionList;
418            }
419        });
420        return actions;
421    }
422
423    /**
424     * Load All the actions that are running and were last checked after now - miminumCheckAgeSecs
425     *
426     * @param checkAgeSecs check age in seconds.
427     * @return List of action beans.
428     * @throws StoreException
429     */
430    public List<WorkflowActionBean> getRunningActions(final long checkAgeSecs) throws StoreException {
431        List<WorkflowActionBean> actions = doOperation("getRunningActions", new Callable<List<WorkflowActionBean>>() {
432
433            public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
434                List<WorkflowActionBean> actions = new ArrayList<WorkflowActionBean>();
435                Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
436                try {
437                    Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS");
438                    q.setParameter("lastCheckTime", ts);
439                    actions = q.getResultList();
440                }
441                catch (IllegalStateException e) {
442                    throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
443                }
444
445                return actions;
446            }
447        });
448        return actions;
449    }
450
451    /**
452     * Load All the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL.
453     *
454     * @param wfId String
455     * @return List of action beans
456     * @throws StoreException
457     */
458    public List<WorkflowActionBean> getRetryAndManualActions(final String wfId) throws StoreException {
459        List<WorkflowActionBean> actions = doOperation("GET_RETRY_MANUAL_ACTIONS",
460                new Callable<List<WorkflowActionBean>>() {
461                    public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
462                        List<WorkflowActionBean> actionList = null;
463                        try {
464                            Query q = entityManager.createNamedQuery("GET_RETRY_MANUAL_ACTIONS");
465                            q.setParameter("wfId", wfId);
466                            actionList = q.getResultList();
467                        }
468                        catch (IllegalStateException e) {
469                            throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
470                        }
471
472                        return actionList;
473                    }
474                });
475        return actions;
476    }
477
478    /**
479     * Loads all the jobs that are satisfying the given filter condition. Filters can be applied on user, group,
480     * appName, status.
481     *
482     * @param filter Filter condition
483     * @param start offset for select statement
484     * @param len number of Workflows to be returned
485     * @return A list of workflows
486     * @throws StoreException
487     */
488    public WorkflowsInfo getWorkflowsInfo(final Map<String, List<String>> filter, final int start, final int len)
489            throws StoreException {
490
491        WorkflowsInfo workFlowsInfo = doOperation("getWorkflowsInfo", new Callable<WorkflowsInfo>() {
492            @SuppressWarnings("unchecked")
493            public WorkflowsInfo call() throws SQLException, StoreException {
494
495                List<String> orArray = new ArrayList<String>();
496                List<String> colArray = new ArrayList<String>();
497                List<String> valArray = new ArrayList<String>();
498                StringBuilder sb = new StringBuilder("");
499                boolean isStatus = false;
500                boolean isGroup = false;
501                boolean isAppName = false;
502                boolean isUser = false;
503                boolean isEnabled = false;
504                int index = 0;
505                for (Map.Entry<String, List<String>> entry : filter.entrySet()) {
506                    String colName = null;
507                    String colVar = null;
508                    if (entry.getKey().equals(OozieClient.FILTER_GROUP)) {
509                        List<String> values = filter.get(OozieClient.FILTER_GROUP);
510                        colName = "group";
511                        for (int i = 0; i < values.size(); i++) {
512                            colVar = "group";
513                            colVar = colVar + index;
514                            if (!isEnabled && !isGroup) {
515                                sb.append(seletStr).append(" where w.group IN (:group" + index);
516                                isGroup = true;
517                                isEnabled = true;
518                            }
519                            else {
520                                if (isEnabled && !isGroup) {
521                                    sb.append(" and w.group IN (:group" + index);
522                                    isGroup = true;
523                                }
524                                else {
525                                    if (isGroup) {
526                                        sb.append(", :group" + index);
527                                    }
528                                }
529                            }
530                            if (i == values.size() - 1) {
531                                sb.append(")");
532                            }
533                            index++;
534                            valArray.add(values.get(i));
535                            orArray.add(colName);
536                            colArray.add(colVar);
537                        }
538                    }
539                    else {
540                        if (entry.getKey().equals(OozieClient.FILTER_STATUS)) {
541                            List<String> values = filter.get(OozieClient.FILTER_STATUS);
542                            colName = "status";
543                            for (int i = 0; i < values.size(); i++) {
544                                colVar = "status";
545                                colVar = colVar + index;
546                                if (!isEnabled && !isStatus) {
547                                    sb.append(seletStr).append(" where w.statusStr IN (:status" + index);
548                                    isStatus = true;
549                                    isEnabled = true;
550                                }
551                                else {
552                                    if (isEnabled && !isStatus) {
553                                        sb.append(" and w.statusStr IN (:status" + index);
554                                        isStatus = true;
555                                    }
556                                    else {
557                                        if (isStatus) {
558                                            sb.append(", :status" + index);
559                                        }
560                                    }
561                                }
562                                if (i == values.size() - 1) {
563                                    sb.append(")");
564                                }
565                                index++;
566                                valArray.add(values.get(i));
567                                orArray.add(colName);
568                                colArray.add(colVar);
569                            }
570                        }
571                        else {
572                            if (entry.getKey().equals(OozieClient.FILTER_NAME)) {
573                                List<String> values = filter.get(OozieClient.FILTER_NAME);
574                                colName = "appName";
575                                for (int i = 0; i < values.size(); i++) {
576                                    colVar = "appName";
577                                    colVar = colVar + index;
578                                    if (!isEnabled && !isAppName) {
579                                        sb.append(seletStr).append(" where w.appName IN (:appName" + index);
580                                        isAppName = true;
581                                        isEnabled = true;
582                                    }
583                                    else {
584                                        if (isEnabled && !isAppName) {
585                                            sb.append(" and w.appName IN (:appName" + index);
586                                            isAppName = true;
587                                        }
588                                        else {
589                                            if (isAppName) {
590                                                sb.append(", :appName" + index);
591                                            }
592                                        }
593                                    }
594                                    if (i == values.size() - 1) {
595                                        sb.append(")");
596                                    }
597                                    index++;
598                                    valArray.add(values.get(i));
599                                    orArray.add(colName);
600                                    colArray.add(colVar);
601                                }
602                            }
603                            else {
604                                if (entry.getKey().equals(OozieClient.FILTER_USER)) {
605                                    List<String> values = filter.get(OozieClient.FILTER_USER);
606                                    colName = "user";
607                                    for (int i = 0; i < values.size(); i++) {
608                                        colVar = "user";
609                                        colVar = colVar + index;
610                                        if (!isEnabled && !isUser) {
611                                            sb.append(seletStr).append(" where w.user IN (:user" + index);
612                                            isUser = true;
613                                            isEnabled = true;
614                                        }
615                                        else {
616                                            if (isEnabled && !isUser) {
617                                                sb.append(" and w.user IN (:user" + index);
618                                                isUser = true;
619                                            }
620                                            else {
621                                                if (isUser) {
622                                                    sb.append(", :user" + index);
623                                                }
624                                            }
625                                        }
626                                        if (i == values.size() - 1) {
627                                            sb.append(")");
628                                        }
629                                        index++;
630                                        valArray.add(values.get(i));
631                                        orArray.add(colName);
632                                        colArray.add(colVar);
633                                    }
634                                }
635                            }
636                        }
637                    }
638                }
639
640                int realLen = 0;
641
642                Query q = null;
643                Query qTotal = null;
644                if (orArray.size() == 0) {
645                    q = entityManager.createNamedQuery("GET_WORKFLOWS_COLUMNS");
646                    q.setFirstResult(start - 1);
647                    q.setMaxResults(len);
648                    qTotal = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT");
649                }
650                else {
651                    if (orArray.size() > 0) {
652                        StringBuilder sbTotal = new StringBuilder(sb);
653                        sb.append(" order by w.startTimestamp desc ");
654                        XLog.getLog(getClass()).debug("Created String is **** " + sb.toString());
655                        q = entityManager.createQuery(sb.toString());
656                        q.setFirstResult(start - 1);
657                        q.setMaxResults(len);
658                        qTotal = entityManager.createQuery(sbTotal.toString().replace(seletStr, countStr));
659                        for (int i = 0; i < orArray.size(); i++) {
660                            q.setParameter(colArray.get(i), valArray.get(i));
661                            qTotal.setParameter(colArray.get(i), valArray.get(i));
662                        }
663                    }
664                }
665
666                OpenJPAQuery kq = OpenJPAPersistence.cast(q);
667                JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
668                fetch.setFetchBatchSize(20);
669                fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
670                fetch.setFetchDirection(FetchDirection.FORWARD);
671                fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
672                List<?> resultList = q.getResultList();
673                List<Object[]> objectArrList = (List<Object[]>) resultList;
674                List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>();
675
676                for (Object[] arr : objectArrList) {
677                    WorkflowJobBean ww = getBeanForWorkflowFromArray(arr);
678                    wfBeansList.add(ww);
679                }
680
681                realLen = ((Long) qTotal.getSingleResult()).intValue();
682
683                return new WorkflowsInfo(wfBeansList, start, len, realLen);
684            }
685        });
686        return workFlowsInfo;
687
688    }
689
690    /**
691     * Load the Workflow and all Action details and return a WorkflowJobBean. Workflow Instance is not loaded
692     *
693     * @param id Workflow Id
694     * @return Workflow Bean
695     * @throws StoreException If Workflow doesn't exist
696     */
697    public WorkflowJobBean getWorkflowInfo(final String id) throws StoreException {
698        ParamChecker.notEmpty(id, "WorkflowID");
699        WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() {
700            public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException {
701                WorkflowJobBean wfBean = null;
702                wfBean = getWorkflowforInfo(id, false);
703                if (wfBean == null) {
704                    throw new StoreException(ErrorCode.E0604, id);
705                }
706                else {
707                    wfBean.setActions(getActionsForWorkflow(id, false));
708                }
709                return wfBean;
710            }
711        });
712        return wfBean;
713    }
714
715    /**
716     * Load the Workflow and subset Actions details and return a WorkflowJobBean. Workflow Instance is not loaded
717     *
718     * @param id Workflow Id
719     * @param start offset for select statement for actions
720     * @param len number of Workflow Actions to be returned
721     * @return Workflow Bean
722     * @throws StoreException If Workflow doesn't exist
723     */
724    public WorkflowJobBean getWorkflowInfoWithActionsSubset(final String id, final int start, final int len) throws StoreException {
725        ParamChecker.notEmpty(id, "WorkflowID");
726        WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() {
727            public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException {
728                WorkflowJobBean wfBean = null;
729                wfBean = getWorkflowforInfo(id, false);
730                if (wfBean == null) {
731                    throw new StoreException(ErrorCode.E0604, id);
732                }
733                else {
734                    wfBean.setActions(getActionsSubsetForWorkflow(id, start, len));
735                }
736                return wfBean;
737            }
738        });
739        return wfBean;
740    }
741
742    /**
743     * Get the Workflow ID with given external ID which will be assigned for the subworkflows.
744     *
745     * @param externalId external ID
746     * @return Workflow ID
747     * @throws StoreException if there is no job with external ID
748     */
749    public String getWorkflowIdForExternalId(final String externalId) throws StoreException {
750        ParamChecker.notEmpty(externalId, "externalId");
751        String wfId = doOperation("getWorkflowIdForExternalId", new Callable<String>() {
752            public String call() throws SQLException, StoreException {
753                String id = "";
754                Query q = entityManager.createNamedQuery("GET_WORKFLOW_ID_FOR_EXTERNAL_ID");
755                q.setParameter("externalId", externalId);
756                List<String> w = q.getResultList();
757                if (w.size() == 0) {
758                    id = "";
759                }
760                else {
761                    int index = w.size() - 1;
762                    id = w.get(index);
763                }
764                return id;
765            }
766        });
767        return wfId;
768    }
769
770    private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
771
772    /**
773     * Purge the Workflows Completed older than given days.
774     *
775     * @param olderThanDays number of days for which to preserve the workflows
776     * @throws StoreException
777     */
778    public void purge(final long olderThanDays, final int limit) throws StoreException {
779        doOperation("purge", new Callable<Void>() {
780            public Void call() throws SQLException, StoreException, WorkflowException {
781                Timestamp maxEndTime = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS));
782                Query q = entityManager.createNamedQuery("GET_COMPLETED_WORKFLOWS_OLDER_THAN");
783                q.setParameter("endTime", maxEndTime);
784                q.setMaxResults(limit);
785                List<WorkflowJobBean> workflows = q.getResultList();
786                int actionDeleted = 0;
787                if (workflows.size() != 0) {
788                    for (WorkflowJobBean w : workflows) {
789                        String wfId = w.getId();
790                        entityManager.remove(w);
791                        Query g = entityManager.createNamedQuery("DELETE_ACTIONS_FOR_WORKFLOW");
792                        g.setParameter("wfId", wfId);
793                        actionDeleted += g.executeUpdate();
794                    }
795                }
796                XLog.getLog(getClass()).debug("ENDED Workflow Purge deleted jobs :" + workflows.size() + " and actions " + actionDeleted);
797                return null;
798            }
799        });
800    }
801
802    private <V> V doOperation(String name, Callable<V> command) throws StoreException {
803        try {
804            Instrumentation.Cron cron = new Instrumentation.Cron();
805            cron.start();
806            V retVal;
807            try {
808                retVal = command.call();
809            }
810            finally {
811                cron.stop();
812            }
813            Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron);
814            return retVal;
815        }
816        catch (StoreException ex) {
817            throw ex;
818        }
819        catch (SQLException ex) {
820            throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex);
821        }
822        catch (Exception e) {
823            throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e);
824        }
825    }
826
827    private WorkflowJobBean getWorkflowOnly(final String id, boolean locking) throws SQLException,
828            InterruptedException, StoreException {
829        WorkflowJobBean wfBean = null;
830        Query q = entityManager.createNamedQuery("GET_WORKFLOW");
831        /*
832         * if (locking) { // q.setHint("openjpa.FetchPlan.ReadLockMode","READ");
833         * OpenJPAQuery oq = OpenJPAPersistence.cast(q); FetchPlan fetch =
834         * oq.getFetchPlan(); fetch.setReadLockMode(LockModeType.WRITE);
835         * fetch.setLockTimeout(-1); // unlimited }
836         */
837        q.setParameter("id", id);
838        List<WorkflowJobBean> w = q.getResultList();
839        if (w.size() > 0) {
840            wfBean = w.get(0);
841        }
842        return wfBean;
843        // return getBeanForRunningWorkflow(wfBean);
844    }
845
846    private WorkflowJobBean getWorkflowforInfo(final String id, boolean locking) throws SQLException,
847            InterruptedException, StoreException {
848        WorkflowJobBean wfBean = null;
849        Query q = entityManager.createNamedQuery("GET_WORKFLOW");
850        q.setParameter("id", id);
851        List<WorkflowJobBean> w = q.getResultList();
852        if (w.size() > 0) {
853            wfBean = w.get(0);
854            return getBeanForRunningWorkflow(wfBean);
855        }
856        return null;
857    }
858
859    private WorkflowJobBean getBeanForRunningWorkflow(WorkflowJobBean w) throws SQLException {
860        WorkflowJobBean wfBean = new WorkflowJobBean();
861        wfBean.setId(w.getId());
862        wfBean.setAppName(w.getAppName());
863        wfBean.setAppPath(w.getAppPath());
864        wfBean.setConfBlob(w.getConfBlob());
865        wfBean.setGroup(w.getGroup());
866        wfBean.setRun(w.getRun());
867        wfBean.setUser(w.getUser());
868        wfBean.setCreatedTime(w.getCreatedTime());
869        wfBean.setEndTime(w.getEndTime());
870        wfBean.setExternalId(w.getExternalId());
871        wfBean.setLastModifiedTime(w.getLastModifiedTime());
872        wfBean.setLogToken(w.getLogToken());
873        wfBean.setProtoActionConfBlob(w.getProtoActionConfBlob());
874        wfBean.setSlaXmlBlob(w.getSlaXmlBlob());
875        wfBean.setStartTime(w.getStartTime());
876        wfBean.setStatus(w.getStatus());
877        wfBean.setWfInstanceBlob(w.getWfInstanceBlob());
878        return wfBean;
879    }
880
881    private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) {
882
883        WorkflowJobBean wfBean = new WorkflowJobBean();
884        wfBean.setId((String) arr[0]);
885        if (arr[1] != null) {
886            wfBean.setAppName((String) arr[1]);
887        }
888        if (arr[2] != null) {
889            wfBean.setStatus(Status.valueOf((String) arr[2]));
890        }
891        if (arr[3] != null) {
892            wfBean.setRun((Integer) arr[3]);
893        }
894        if (arr[4] != null) {
895            wfBean.setUser((String) arr[4]);
896        }
897        if (arr[5] != null) {
898            wfBean.setGroup((String) arr[5]);
899        }
900        if (arr[6] != null) {
901            wfBean.setCreatedTime((Timestamp) arr[6]);
902        }
903        if (arr[7] != null) {
904            wfBean.setStartTime((Timestamp) arr[7]);
905        }
906        if (arr[8] != null) {
907            wfBean.setLastModifiedTime((Timestamp) arr[8]);
908        }
909        if (arr[9] != null) {
910            wfBean.setEndTime((Timestamp) arr[9]);
911        }
912        return wfBean;
913    }
914
915    private WorkflowActionBean getBeanForRunningAction(WorkflowActionBean a) throws SQLException {
916        if (a != null) {
917            WorkflowActionBean action = new WorkflowActionBean();
918            action.setId(a.getId());
919            action.setConfBlob(a.getConfBlob());
920            action.setConsoleUrl(a.getConsoleUrl());
921            action.setDataBlob(a.getDataBlob());
922            action.setStatsBlob(a.getStatsBlob());
923            action.setExternalChildIDsBlob(a.getExternalChildIDsBlob());
924            action.setErrorInfo(a.getErrorCode(), a.getErrorMessage());
925            action.setExternalId(a.getExternalId());
926            action.setExternalStatus(a.getExternalStatus());
927            action.setName(a.getName());
928            action.setCred(a.getCred());
929            action.setRetries(a.getRetries());
930            action.setTrackerUri(a.getTrackerUri());
931            action.setTransition(a.getTransition());
932            action.setType(a.getType());
933            action.setEndTime(a.getEndTime());
934            action.setExecutionPath(a.getExecutionPath());
935            action.setLastCheckTime(a.getLastCheckTime());
936            action.setLogToken(a.getLogToken());
937            if (a.isPending() == true) {
938                action.setPending();
939            }
940            action.setPendingAge(a.getPendingAge());
941            action.setSignalValue(a.getSignalValue());
942            action.setSlaXmlBlob(a.getSlaXmlBlob());
943            action.setStartTime(a.getStartTime());
944            action.setStatus(a.getStatus());
945            action.setJobId(a.getWfId());
946            action.setUserRetryCount(a.getUserRetryCount());
947            action.setUserRetryInterval(a.getUserRetryInterval());
948            action.setUserRetryMax(a.getUserRetryMax());
949            return action;
950        }
951        return null;
952    }
953}