001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.store;
019    
020    import java.sql.SQLException;
021    import java.sql.Timestamp;
022    import java.util.ArrayList;
023    import java.util.Date;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.concurrent.Callable;
027    
028    import javax.persistence.EntityManager;
029    import javax.persistence.Query;
030    
031    import org.apache.oozie.CoordinatorActionBean;
032    import org.apache.oozie.CoordinatorJobBean;
033    import org.apache.oozie.CoordinatorJobInfo;
034    import org.apache.oozie.ErrorCode;
035    import org.apache.oozie.client.Job.Status;
036    import org.apache.oozie.client.CoordinatorJob.Timeunit;
037    import org.apache.oozie.service.InstrumentationService;
038    import org.apache.oozie.service.Services;
039    import org.apache.oozie.util.DateUtils;
040    import org.apache.oozie.util.Instrumentation;
041    import org.apache.oozie.util.ParamChecker;
042    import org.apache.oozie.util.XLog;
043    import org.apache.oozie.workflow.WorkflowException;
044    import org.apache.openjpa.persistence.OpenJPAPersistence;
045    import org.apache.openjpa.persistence.OpenJPAQuery;
046    import org.apache.openjpa.persistence.jdbc.FetchDirection;
047    import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
048    import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
049    import org.apache.openjpa.persistence.jdbc.ResultSetType;
050    
051    /**
052     * DB Implementation of Coord Store
053     */
054    public class CoordinatorStore extends Store {
055        private final XLog log = XLog.getLog(getClass());
056    
057        private EntityManager entityManager;
058        private static final String INSTR_GROUP = "db";
059        public static final int LOCK_TIMEOUT = 50000;
060        private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
061    
062        public CoordinatorStore(boolean selectForUpdate) throws StoreException {
063            super();
064            entityManager = getEntityManager();
065        }
066    
067        public CoordinatorStore(Store store, boolean selectForUpdate) throws StoreException {
068            super(store);
069            entityManager = getEntityManager();
070        }
071    
072        /**
073         * Create a CoordJobBean. It also creates the process instance for the job.
074         *
075         * @param workflow workflow bean
076         * @throws StoreException
077         */
078    
079        public void insertCoordinatorJob(final CoordinatorJobBean coordinatorJob) throws StoreException {
080            ParamChecker.notNull(coordinatorJob, "coordinatorJob");
081    
082            doOperation("insertCoordinatorJob", new Callable<Void>() {
083                public Void call() throws StoreException {
084                    entityManager.persist(coordinatorJob);
085                    return null;
086                }
087            });
088        }
089    
090        /**
091         * Load the CoordinatorJob into a Bean and return it. Also load the Workflow Instance into the bean. And lock the
092         * Workflow depending on the locking parameter.
093         *
094         * @param id Job ID
095         * @param locking Flag for Table Lock
096         * @return CoordinatorJobBean
097         * @throws StoreException
098         */
099        public CoordinatorJobBean getCoordinatorJob(final String id, final boolean locking) throws StoreException {
100            ParamChecker.notEmpty(id, "CoordJobId");
101            CoordinatorJobBean cjBean = doOperation("getCoordinatorJob", new Callable<CoordinatorJobBean>() {
102                @SuppressWarnings("unchecked")
103                public CoordinatorJobBean call() throws StoreException {
104                    Query q = entityManager.createNamedQuery("GET_COORD_JOB");
105                    q.setParameter("id", id);
106                    /*
107                     * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q);
108                     * // q.setHint("openjpa.FetchPlan.ReadLockMode","WRITE");
109                     * FetchPlan fetch = oq.getFetchPlan();
110                     * fetch.setReadLockMode(LockModeType.WRITE);
111                     * fetch.setLockTimeout(-1); // 1 second }
112                     */
113                    List<CoordinatorJobBean> cjBeans = q.getResultList();
114    
115                    if (cjBeans.size() > 0) {
116                        return cjBeans.get(0);
117                    }
118                    else {
119                        throw new StoreException(ErrorCode.E0604, id);
120                    }
121                }
122            });
123    
124            cjBean.setStatus(cjBean.getStatus());
125            return cjBean;
126        }
127    
128        /**
129         * Get a list of Coordinator Jobs that should be materialized. Jobs with a 'last materialized time' older than the
130         * argument will be returned.
131         *
132         * @param d Date
133         * @return List of Coordinator Jobs that have a last materialized time older than input date
134         * @throws StoreException
135         */
136        public List<CoordinatorJobBean> getCoordinatorJobsToBeMaterialized(final Date d, final int limit)
137                throws StoreException {
138    
139            ParamChecker.notNull(d, "Coord Job Materialization Date");
140            List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsToBeMaterialized",
141                                                                                      new Callable<List<CoordinatorJobBean>>() {
142                                                                                          public List<CoordinatorJobBean> call() throws StoreException {
143    
144                                                                                              List<CoordinatorJobBean> cjBeans;
145                                                                                              List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>();
146                                                                                              try {
147                                                                                                  Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN");
148                                                                                                  q.setParameter("matTime", new Timestamp(d.getTime()));
149                                                                                                  if (limit > 0) {
150                                                                                                      q.setMaxResults(limit);
151                                                                                                  }
152                                                                                                  /*
153                                                                                                  OpenJPAQuery oq = OpenJPAPersistence.cast(q);
154                                                                                                  FetchPlan fetch = oq.getFetchPlan();
155                                                                                                  fetch.setReadLockMode(LockModeType.WRITE);
156                                                                                                  fetch.setLockTimeout(-1); // no limit
157                                                                                                  */
158                                                                                                  cjBeans = q.getResultList();
159                                                                                                  // copy results to a new object
160                                                                                                  for (CoordinatorJobBean j : cjBeans) {
161                                                                                                      jobList.add(j);
162                                                                                                  }
163                                                                                              }
164                                                                                              catch (IllegalStateException e) {
165                                                                                                  throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
166                                                                                              }
167                                                                                              return jobList;
168    
169                                                                                          }
170                                                                                      });
171            return cjBeans;
172        }
173    
174        /**
175         * A list of Coordinator Jobs that are matched with the status and have last materialized time' older than
176         * checkAgeSecs will be returned.
177         *
178         * @param checkAgeSecs Job age in Seconds
179         * @param status Coordinator Job Status
180         * @param limit Number of results to return
181         * @param locking Flag for Table Lock
182         * @return List of Coordinator Jobs that are matched with the parameters.
183         * @throws StoreException
184         */
185        public List<CoordinatorJobBean> getCoordinatorJobsOlderThanStatus(final long checkAgeSecs, final String status,
186                                                                          final int limit, final boolean locking) throws StoreException {
187    
188            ParamChecker.notNull(status, "Coord Job Status");
189            List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsOlderThanStatus",
190                                                                                      new Callable<List<CoordinatorJobBean>>() {
191                                                                                          public List<CoordinatorJobBean> call() throws StoreException {
192    
193                                                                                              List<CoordinatorJobBean> cjBeans;
194                                                                                              List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>();
195                                                                                              try {
196                                                                                                  Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN_STATUS");
197                                                                                                  Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
198                                                                                                  q.setParameter("lastModTime", ts);
199                                                                                                  q.setParameter("status", status);
200                                                                                                  if (limit > 0) {
201                                                                                                      q.setMaxResults(limit);
202                                                                                                  }
203                                                                                                  /*
204                                                                                                  * if (locking) { OpenJPAQuery oq =
205                                                                                                  * OpenJPAPersistence.cast(q); FetchPlan fetch =
206                                                                                                  * oq.getFetchPlan();
207                                                                                                  * fetch.setReadLockMode(LockModeType.WRITE);
208                                                                                                  * fetch.setLockTimeout(-1); // no limit }
209                                                                                                  */
210                                                                                                  cjBeans = q.getResultList();
211                                                                                                  for (CoordinatorJobBean j : cjBeans) {
212                                                                                                      jobList.add(j);
213                                                                                                  }
214                                                                                              }
215                                                                                              catch (Exception e) {
216                                                                                                  throw new StoreException(ErrorCode.E0603, e.getMessage(), e);
217                                                                                              }
218                                                                                              return jobList;
219    
220                                                                                          }
221                                                                                      });
222            return cjBeans;
223        }
224    
225        /**
226         * Load the CoordinatorAction into a Bean and return it.
227         *
228         * @param id action ID
229         * @return CoordinatorActionBean
230         * @throws StoreException
231         */
232        public CoordinatorActionBean getCoordinatorAction(final String id, final boolean locking) throws StoreException {
233            ParamChecker.notEmpty(id, "actionID");
234            CoordinatorActionBean caBean = doOperation("getCoordinatorAction", new Callable<CoordinatorActionBean>() {
235                public CoordinatorActionBean call() throws StoreException {
236                    Query q = entityManager.createNamedQuery("GET_COORD_ACTION");
237                    q.setParameter("id", id);
238                    OpenJPAQuery oq = OpenJPAPersistence.cast(q);
239                    /*
240                     * if (locking) { //q.setHint("openjpa.FetchPlan.ReadLockMode",
241                     * "WRITE"); FetchPlan fetch = oq.getFetchPlan();
242                     * fetch.setReadLockMode(LockModeType.WRITE);
243                     * fetch.setLockTimeout(-1); // no limit }
244                     */
245    
246                    CoordinatorActionBean action = null;
247                    List<CoordinatorActionBean> actions = q.getResultList();
248                    if (actions.size() > 0) {
249                        action = actions.get(0);
250                    }
251                    else {
252                        throw new StoreException(ErrorCode.E0605, id);
253                    }
254    
255                    /*
256                     * if (locking) return action; else
257                     */
258                    return getBeanForRunningCoordAction(action);
259                }
260            });
261            return caBean;
262        }
263    
264        /**
265         * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <=
266         * concurrency number. Sort returned actions based on execution order (FIFO, LIFO, LAST_ONLY)
267         *
268         * @param id job ID
269         * @param numResults number of results to return
270         * @param executionOrder execution for this job - FIFO, LIFO, LAST_ONLY
271         * @return List of CoordinatorActionBean
272         * @throws StoreException
273         */
274        public List<CoordinatorActionBean> getCoordinatorActionsForJob(final String id, final int numResults,
275                                                                       final String executionOrder) throws StoreException {
276            ParamChecker.notEmpty(id, "jobID");
277            List<CoordinatorActionBean> caBeans = doOperation("getCoordinatorActionsForJob",
278                                                              new Callable<List<CoordinatorActionBean>>() {
279                                                                  public List<CoordinatorActionBean> call() throws StoreException {
280    
281                                                                      List<CoordinatorActionBean> caBeans;
282                                                                      Query q;
283                                                                      // check if executionOrder is FIFO, LIFO, or LAST_ONLY
284                                                                      if (executionOrder.equalsIgnoreCase("FIFO")) {
285                                                                          q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_FIFO");
286                                                                      }
287                                                                      else {
288                                                                          q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_LIFO");
289                                                                      }
290                                                                      q.setParameter("jobId", id);
291                                                                      // if executionOrder is LAST_ONLY, only retrieve first
292                                                                      // record in LIFO,
293                                                                      // otherwise, use numResults if it is positive.
294                                                                      if (executionOrder.equalsIgnoreCase("LAST_ONLY")) {
295                                                                          q.setMaxResults(1);
296                                                                      }
297                                                                      else {
298                                                                          if (numResults > 0) {
299                                                                              q.setMaxResults(numResults);
300                                                                          }
301                                                                      }
302                                                                      caBeans = q.getResultList();
303                                                                      return caBeans;
304                                                                  }
305                                                              });
306            return caBeans;
307        }
308    
309        /**
310         * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <=
311         * concurrency number.
312         *
313         * @param id job ID
314         * @return Number of running actions
315         * @throws StoreException
316         */
317        public int getCoordinatorRunningActionsCount(final String id) throws StoreException {
318            ParamChecker.notEmpty(id, "jobID");
319            Integer cnt = doOperation("getCoordinatorRunningActionsCount", new Callable<Integer>() {
320                public Integer call() throws SQLException {
321    
322                    Query q = entityManager.createNamedQuery("GET_COORD_RUNNING_ACTIONS_COUNT");
323    
324                    q.setParameter("jobId", id);
325                    Long count = (Long) q.getSingleResult();
326                    return Integer.valueOf(count.intValue());
327                }
328            });
329            return cnt.intValue();
330        }
331    
332        /**
333         * Create a new Action record in the ACTIONS table with the given Bean.
334         *
335         * @param action WorkflowActionBean
336         * @throws StoreException If the action is already present
337         */
338        public void insertCoordinatorAction(final CoordinatorActionBean action) throws StoreException {
339            ParamChecker.notNull(action, "CoordinatorActionBean");
340            doOperation("insertCoordinatorAction", new Callable<Void>() {
341                public Void call() throws StoreException {
342                    entityManager.persist(action);
343                    return null;
344                }
345            });
346        }
347    
348        /**
349         * Update the given action bean to DB.
350         *
351         * @param action Action Bean
352         * @throws StoreException if action doesn't exist
353         */
354        public void updateCoordinatorAction(final CoordinatorActionBean action) throws StoreException {
355            ParamChecker.notNull(action, "CoordinatorActionBean");
356            doOperation("updateCoordinatorAction", new Callable<Void>() {
357                public Void call() throws StoreException {
358                    Query q = entityManager.createNamedQuery("UPDATE_COORD_ACTION");
359                    q.setParameter("id", action.getId());
360                    setActionQueryParameters(action, q);
361                    q.executeUpdate();
362                    return null;
363                }
364            });
365        }
366    
367        /**
368         * Update the given action bean to DB.
369         *
370         * @param action Action Bean
371         * @throws StoreException if action doesn't exist
372         */
373        public void updateCoordActionMin(final CoordinatorActionBean action) throws StoreException {
374            ParamChecker.notNull(action, "CoordinatorActionBean");
375            doOperation("updateCoordinatorAction", new Callable<Void>() {
376                public Void call() throws StoreException {
377                    Query q = entityManager.createNamedQuery("UPDATE_COORD_ACTION_MIN");
378                    q.setParameter("id", action.getId());
379                    q.setParameter("missingDependencies", action.getMissingDependencies());
380                    q.setParameter("lastModifiedTime", new Date());
381                    q.setParameter("status", action.getStatus().toString());
382                    q.setParameter("actionXml", action.getActionXml());
383                    q.executeUpdate();
384                    return null;
385                }
386            });
387        }
388    
389        /**
390         * Update the given coordinator job bean to DB.
391         *
392         * @param jobbean Coordinator Job Bean
393         * @throws StoreException if action doesn't exist
394         */
395        public void updateCoordinatorJob(final CoordinatorJobBean job) throws StoreException {
396            ParamChecker.notNull(job, "CoordinatorJobBean");
397            doOperation("updateJob", new Callable<Void>() {
398                public Void call() throws StoreException {
399                    Query q = entityManager.createNamedQuery("UPDATE_COORD_JOB");
400                    q.setParameter("id", job.getId());
401                    setJobQueryParameters(job, q);
402                    q.executeUpdate();
403                    return null;
404                }
405            });
406        }
407    
408        public void updateCoordinatorJobStatus(final CoordinatorJobBean job) throws StoreException {
409            ParamChecker.notNull(job, "CoordinatorJobBean");
410            doOperation("updateJobStatus", new Callable<Void>() {
411                public Void call() throws StoreException {
412                    Query q = entityManager.createNamedQuery("UPDATE_COORD_JOB_STATUS");
413                    q.setParameter("id", job.getId());
414                    q.setParameter("status", job.getStatus().toString());
415                    q.setParameter("lastModifiedTime", new Date());
416                    q.executeUpdate();
417                    return null;
418                }
419            });
420        }
421    
422        private <V> V doOperation(String name, Callable<V> command) throws StoreException {
423            try {
424                Instrumentation.Cron cron = new Instrumentation.Cron();
425                cron.start();
426                V retVal;
427                try {
428                    retVal = command.call();
429                }
430                finally {
431                    cron.stop();
432                }
433                Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron);
434                return retVal;
435            }
436            catch (StoreException ex) {
437                throw ex;
438            }
439            catch (SQLException ex) {
440                throw new StoreException(ErrorCode.E0603, name, ex.getMessage(), ex);
441            }
442            catch (Exception e) {
443                throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e);
444            }
445        }
446    
447        private void setJobQueryParameters(CoordinatorJobBean jBean, Query q) {
448            q.setParameter("appName", jBean.getAppName());
449            q.setParameter("appPath", jBean.getAppPath());
450            q.setParameter("concurrency", jBean.getConcurrency());
451            q.setParameter("conf", jBean.getConf());
452            q.setParameter("externalId", jBean.getExternalId());
453            q.setParameter("frequency", jBean.getFrequency());
454            q.setParameter("lastActionNumber", jBean.getLastActionNumber());
455            q.setParameter("timeOut", jBean.getTimeout());
456            q.setParameter("timeZone", jBean.getTimeZone());
457            q.setParameter("authToken", jBean.getAuthToken());
458            q.setParameter("createdTime", jBean.getCreatedTimestamp());
459            q.setParameter("endTime", jBean.getEndTimestamp());
460            q.setParameter("execution", jBean.getExecution());
461            q.setParameter("jobXml", jBean.getJobXml());
462            q.setParameter("lastAction", jBean.getLastActionTimestamp());
463            q.setParameter("lastModifiedTime", new Date());
464            q.setParameter("nextMaterializedTime", jBean.getNextMaterializedTimestamp());
465            q.setParameter("origJobXml", jBean.getOrigJobXml());
466            q.setParameter("slaXml", jBean.getSlaXml());
467            q.setParameter("startTime", jBean.getStartTimestamp());
468            q.setParameter("status", jBean.getStatus().toString());
469            q.setParameter("timeUnit", jBean.getTimeUnitStr());
470        }
471    
472        private void setActionQueryParameters(CoordinatorActionBean aBean, Query q) {
473            q.setParameter("actionNumber", aBean.getActionNumber());
474            q.setParameter("actionXml", aBean.getActionXml());
475            q.setParameter("consoleUrl", aBean.getConsoleUrl());
476            q.setParameter("createdConf", aBean.getCreatedConf());
477            q.setParameter("errorCode", aBean.getErrorCode());
478            q.setParameter("errorMessage", aBean.getErrorMessage());
479            q.setParameter("externalStatus", aBean.getExternalStatus());
480            q.setParameter("missingDependencies", aBean.getMissingDependencies());
481            q.setParameter("runConf", aBean.getRunConf());
482            q.setParameter("timeOut", aBean.getTimeOut());
483            q.setParameter("trackerUri", aBean.getTrackerUri());
484            q.setParameter("type", aBean.getType());
485            q.setParameter("createdTime", aBean.getCreatedTimestamp());
486            q.setParameter("externalId", aBean.getExternalId());
487            q.setParameter("jobId", aBean.getJobId());
488            q.setParameter("lastModifiedTime", new Date());
489            q.setParameter("nominalTime", aBean.getNominalTimestamp());
490            q.setParameter("slaXml", aBean.getSlaXml());
491            q.setParameter("status", aBean.getStatus().toString());
492        }
493    
494    
495        /**
496         * Purge the coordinators completed older than given days.
497         *
498         * @param olderThanDays number of days for which to preserve the coordinators
499         * @param limit maximum number of coordinator jobs to be purged
500         * @throws StoreException
501         */
502        public void purge(final long olderThanDays, final int limit) throws StoreException {
503            doOperation("coord-purge", new Callable<Void>() {
504                public Void call() throws SQLException, StoreException, WorkflowException {
505                    Timestamp lastModTm = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS));
506                    Query jobQ = entityManager.createNamedQuery("GET_COMPLETED_COORD_JOBS_OLDER_THAN_STATUS");
507                    jobQ.setParameter("lastModTime", lastModTm);
508                    jobQ.setMaxResults(limit);
509                    List<CoordinatorJobBean> coordJobs = jobQ.getResultList();
510    
511                    int actionDeleted = 0;
512                    if (coordJobs.size() != 0) {
513                        for (CoordinatorJobBean coord : coordJobs) {
514                            String jobId = coord.getId();
515                            entityManager.remove(coord);
516                            Query g = entityManager.createNamedQuery("DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR");
517                            g.setParameter("jobId", jobId);
518                            actionDeleted += g.executeUpdate();
519                        }
520                    }
521    
522                    XLog.getLog(getClass()).debug("ENDED Coord Purge deleted jobs :" + coordJobs.size() + " and actions " + actionDeleted);
523                    return null;
524                }
525            });
526        }
527    
528        public void commit() throws StoreException {
529        }
530    
531        public void close() throws StoreException {
532        }
533    
534        public CoordinatorJobBean getCoordinatorJobs(String id) {
535            // TODO Auto-generated method stub
536            return null;
537        }
538    
539        public CoordinatorJobInfo getCoordinatorInfo(final Map<String, List<String>> filter, final int start, final int len)
540                throws StoreException {
541    
542            CoordinatorJobInfo coordJobInfo = doOperation("getCoordinatorJobInfo", new Callable<CoordinatorJobInfo>() {
543                public CoordinatorJobInfo call() throws SQLException, StoreException {
544                    List<String> orArray = new ArrayList<String>();
545                    List<String> colArray = new ArrayList<String>();
546                    List<String> valArray = new ArrayList<String>();
547                    StringBuilder sb = new StringBuilder("");
548    
549                    StoreStatusFilter.filter(filter, orArray, colArray, valArray, sb, StoreStatusFilter.coordSeletStr,
550                                             StoreStatusFilter.coordCountStr);
551    
552                    int realLen = 0;
553    
554                    Query q = null;
555                    Query qTotal = null;
556                    if (orArray.size() == 0) {
557                        q = entityManager.createNamedQuery("GET_COORD_JOBS_COLUMNS");
558                        q.setFirstResult(start - 1);
559                        q.setMaxResults(len);
560                        qTotal = entityManager.createNamedQuery("GET_COORD_JOBS_COUNT");
561                    }
562                    else {
563                        StringBuilder sbTotal = new StringBuilder(sb);
564                        sb.append(" order by w.createdTimestamp desc ");
565                        XLog.getLog(getClass()).debug("Created String is **** " + sb.toString());
566                        q = entityManager.createQuery(sb.toString());
567                        q.setFirstResult(start - 1);
568                        q.setMaxResults(len);
569                        qTotal = entityManager.createQuery(sbTotal.toString().replace(StoreStatusFilter.coordSeletStr,
570                                                                                      StoreStatusFilter.coordCountStr));
571                    }
572    
573                    for (int i = 0; i < orArray.size(); i++) {
574                        q.setParameter(colArray.get(i), valArray.get(i));
575                        qTotal.setParameter(colArray.get(i), valArray.get(i));
576                    }
577    
578                    OpenJPAQuery kq = OpenJPAPersistence.cast(q);
579                    JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
580                    fetch.setFetchBatchSize(20);
581                    fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
582                    fetch.setFetchDirection(FetchDirection.FORWARD);
583                    fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
584                    List<?> resultList = q.getResultList();
585                    List<Object[]> objectArrList = (List<Object[]>) resultList;
586                    List<CoordinatorJobBean> coordBeansList = new ArrayList<CoordinatorJobBean>();
587    
588                    for (Object[] arr : objectArrList) {
589                        CoordinatorJobBean ww = getBeanForCoordinatorJobFromArray(arr);
590                        coordBeansList.add(ww);
591                    }
592    
593                    realLen = ((Long) qTotal.getSingleResult()).intValue();
594    
595                    return new CoordinatorJobInfo(coordBeansList, start, len, realLen);
596                }
597            });
598            return coordJobInfo;
599        }
600    
601        private CoordinatorJobBean getBeanForCoordinatorJobFromArray(Object[] arr) {
602            CoordinatorJobBean bean = new CoordinatorJobBean();
603            bean.setId((String) arr[0]);
604            if (arr[1] != null) {
605                bean.setAppName((String) arr[1]);
606            }
607            if (arr[2] != null) {
608                bean.setStatus(Status.valueOf((String) arr[2]));
609            }
610            if (arr[3] != null) {
611                bean.setUser((String) arr[3]);
612            }
613            if (arr[4] != null) {
614                bean.setGroup((String) arr[4]);
615            }
616            if (arr[5] != null) {
617                bean.setStartTime((Timestamp) arr[5]);
618            }
619            if (arr[6] != null) {
620                bean.setEndTime((Timestamp) arr[6]);
621            }
622            if (arr[7] != null) {
623                bean.setAppPath((String) arr[7]);
624            }
625            if (arr[8] != null) {
626                bean.setConcurrency(((Integer) arr[8]).intValue());
627            }
628            if (arr[9] != null) {
629                bean.setFrequency(((Integer) arr[9]).intValue());
630            }
631            if (arr[10] != null) {
632                bean.setLastActionTime((Timestamp) arr[10]);
633            }
634            if (arr[11] != null) {
635                bean.setNextMaterializedTime((Timestamp) arr[11]);
636            }
637            if (arr[13] != null) {
638                bean.setTimeUnit(Timeunit.valueOf((String) arr[13]));
639            }
640            if (arr[14] != null) {
641                bean.setTimeZone((String) arr[14]);
642            }
643            if (arr[15] != null) {
644                bean.setTimeout((Integer) arr[15]);
645            }
646            return bean;
647        }
648    
649        /**
650         * Loads all actions for the given Coordinator job.
651         *
652         * @param jobId coordinator job id
653         * @param locking true if Actions are to be locked
654         * @return A List of CoordinatorActionBean
655         * @throws StoreException
656         */
657        public List<CoordinatorActionBean> getActionsForCoordinatorJob(final String jobId, final boolean locking)
658                throws StoreException {
659            ParamChecker.notEmpty(jobId, "CoordinatorJobID");
660            List<CoordinatorActionBean> actions = doOperation("getActionsForCoordinatorJob",
661                                                              new Callable<List<CoordinatorActionBean>>() {
662                                                                  @SuppressWarnings("unchecked")
663                                                                  public List<CoordinatorActionBean> call() throws StoreException {
664                                                                      List<CoordinatorActionBean> actions;
665                                                                      List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
666                                                                      try {
667                                                                          Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB");
668                                                                          q.setParameter("jobId", jobId);
669                                                                          /*
670                                                                          * if (locking) { //
671                                                                          * q.setHint("openjpa.FetchPlan.ReadLockMode", //
672                                                                          * "READ"); OpenJPAQuery oq =
673                                                                          * OpenJPAPersistence.cast(q); JDBCFetchPlan fetch =
674                                                                          * (JDBCFetchPlan) oq.getFetchPlan();
675                                                                          * fetch.setReadLockMode(LockModeType.WRITE);
676                                                                          * fetch.setLockTimeout(-1); // 1 second }
677                                                                          */
678                                                                          actions = q.getResultList();
679                                                                          for (CoordinatorActionBean a : actions) {
680                                                                              CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
681                                                                              actionList.add(aa);
682                                                                          }
683                                                                      }
684                                                                      catch (IllegalStateException e) {
685                                                                          throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
686                                                                      }
687                                                                      /*
688                                                                      * if (locking) { return actions; } else {
689                                                                      */
690                                                                      return actionList;
691                                                                      // }
692                                                                  }
693                                                              });
694            return actions;
695        }
696    
697        /**
698         * Loads given number of actions for the given Coordinator job.
699         *
700         * @param jobId coordinator job id
701         * @param start offset for select statement
702         * @param len number of Workflow Actions to be returned
703         * @return A List of CoordinatorActionBean
704         * @throws StoreException
705         */
706        public List<CoordinatorActionBean> getActionsSubsetForCoordinatorJob(final String jobId, final int start,
707                                                                             final int len) throws StoreException {
708            ParamChecker.notEmpty(jobId, "CoordinatorJobID");
709            List<CoordinatorActionBean> actions = doOperation("getActionsForCoordinatorJob",
710                                                              new Callable<List<CoordinatorActionBean>>() {
711                                                                  @SuppressWarnings("unchecked")
712                                                                  public List<CoordinatorActionBean> call() throws StoreException {
713                                                                      List<CoordinatorActionBean> actions;
714                                                                      List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
715                                                                      try {
716                                                                          Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB");
717                                                                          q.setParameter("jobId", jobId);
718                                                                          q.setFirstResult(start - 1);
719                                                                          q.setMaxResults(len);
720                                                                          actions = q.getResultList();
721                                                                          for (CoordinatorActionBean a : actions) {
722                                                                              CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
723                                                                              actionList.add(aa);
724                                                                          }
725                                                                      }
726                                                                      catch (IllegalStateException e) {
727                                                                          throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
728                                                                      }
729                                                                      return actionList;
730                                                                  }
731                                                              });
732            return actions;
733        }
734    
735        protected CoordinatorActionBean getBeanForRunningCoordAction(CoordinatorActionBean a) {
736            if (a != null) {
737                CoordinatorActionBean action = new CoordinatorActionBean();
738                action.setId(a.getId());
739                action.setActionNumber(a.getActionNumber());
740                action.setActionXml(a.getActionXml());
741                action.setConsoleUrl(a.getConsoleUrl());
742                action.setCreatedConf(a.getCreatedConf());
743                //action.setErrorCode(a.getErrorCode());
744                //action.setErrorMessage(a.getErrorMessage());
745                action.setExternalStatus(a.getExternalStatus());
746                action.setMissingDependencies(a.getMissingDependencies());
747                action.setRunConf(a.getRunConf());
748                action.setTimeOut(a.getTimeOut());
749                action.setTrackerUri(a.getTrackerUri());
750                action.setType(a.getType());
751                action.setCreatedTime(a.getCreatedTime());
752                action.setExternalId(a.getExternalId());
753                action.setJobId(a.getJobId());
754                action.setLastModifiedTime(a.getLastModifiedTime());
755                action.setNominalTime(a.getNominalTime());
756                action.setSlaXml(a.getSlaXml());
757                action.setStatus(a.getStatus());
758                return action;
759            }
760            return null;
761        }
762    
763        public CoordinatorActionBean getAction(String id, boolean b) {
764            return null;
765        }
766    
767        /*
768         * do not need this public void updateCoordinatorActionForExternalId(final
769         * CoordinatorActionBean action) throws StoreException { // TODO
770         * Auto-generated method stub ParamChecker.notNull(action,
771         * "updateCoordinatorActionForExternalId");
772         * doOperation("updateCoordinatorActionForExternalId", new Callable<Void>()
773         * { public Void call() throws SQLException, StoreException,
774         * WorkflowException { Query q =
775         * entityManager.createNamedQuery("UPDATE_COORD_ACTION_FOR_EXTERNALID");
776         * setActionQueryParameters(action,q); q.executeUpdate(); return null; } });
777         * }
778         */
779        public CoordinatorActionBean getCoordinatorActionForExternalId(final String externalId) throws StoreException {
780            // TODO Auto-generated method stub
781            ParamChecker.notEmpty(externalId, "coodinatorActionExternalId");
782            CoordinatorActionBean cBean = doOperation("getCoordinatorActionForExternalId",
783                                                      new Callable<CoordinatorActionBean>() {
784                                                          public CoordinatorActionBean call() throws StoreException {
785                                                              CoordinatorActionBean caBean = null;
786                                                              Query q = entityManager.createNamedQuery("GET_COORD_ACTION_FOR_EXTERNALID");
787                                                              q.setParameter("externalId", externalId);
788                                                              List<CoordinatorActionBean> actionList = q.getResultList();
789                                                              if (actionList.size() > 0) {
790                                                                  caBean = actionList.get(0);
791                                                              }
792                                                              return caBean;
793                                                          }
794                                                      });
795            return cBean;
796        }
797    
798        public List<CoordinatorActionBean> getRunningActionsForCoordinatorJob(final String jobId, final boolean locking)
799                throws StoreException {
800            ParamChecker.notEmpty(jobId, "CoordinatorJobID");
801            List<CoordinatorActionBean> actions = doOperation("getRunningActionsForCoordinatorJob",
802                                                              new Callable<List<CoordinatorActionBean>>() {
803                                                                  @SuppressWarnings("unchecked")
804                                                                  public List<CoordinatorActionBean> call() throws StoreException {
805                                                                      List<CoordinatorActionBean> actions;
806                                                                      try {
807                                                                          Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_FOR_COORD_JOB");
808                                                                          q.setParameter("jobId", jobId);
809                                                                          /*
810                                                                          * if (locking) {
811                                                                          * q.setHint("openjpa.FetchPlan.ReadLockMode",
812                                                                          * "READ"); OpenJPAQuery oq =
813                                                                          * OpenJPAPersistence.cast(q); FetchPlan fetch =
814                                                                          * oq.getFetchPlan();
815                                                                          * fetch.setReadLockMode(LockModeType.WRITE);
816                                                                          * fetch.setLockTimeout(-1); // no limit }
817                                                                          */
818                                                                          actions = q.getResultList();
819                                                                          return actions;
820                                                                      }
821                                                                      catch (IllegalStateException e) {
822                                                                          throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
823                                                                      }
824                                                                  }
825                                                              });
826            return actions;
827        }
828    
829        public List<CoordinatorActionBean> getRunningActionsOlderThan(final long checkAgeSecs, final boolean locking)
830                throws StoreException {
831            List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan",
832                                                              new Callable<List<CoordinatorActionBean>>() {
833                                                                  @SuppressWarnings("unchecked")
834                                                                  public List<CoordinatorActionBean> call() throws StoreException {
835                                                                      List<CoordinatorActionBean> actions;
836                                                                      Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
837                                                                      try {
838                                                                          Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_OLDER_THAN");
839                                                                          q.setParameter("lastModifiedTime", ts);
840                                                                          /*
841                                                                          * if (locking) { OpenJPAQuery oq =
842                                                                          * OpenJPAPersistence.cast(q); FetchPlan fetch =
843                                                                          * oq.getFetchPlan();
844                                                                          * fetch.setReadLockMode(LockModeType.WRITE);
845                                                                          * fetch.setLockTimeout(-1); // no limit }
846                                                                          */
847                                                                          actions = q.getResultList();
848                                                                          return actions;
849                                                                      }
850                                                                      catch (IllegalStateException e) {
851                                                                          throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
852                                                                      }
853                                                                  }
854                                                              });
855            return actions;
856        }
857    
858        public List<CoordinatorActionBean> getRecoveryActionsOlderThan(final long checkAgeSecs, final boolean locking)
859                throws StoreException {
860            List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan",
861                                                              new Callable<List<CoordinatorActionBean>>() {
862                                                                  @SuppressWarnings("unchecked")
863                                                                  public List<CoordinatorActionBean> call() throws StoreException {
864                                                                      List<CoordinatorActionBean> actions;
865                                                                      try {
866                                                                          Query q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN");
867                                                                          Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
868                                                                          q.setParameter("lastModifiedTime", ts);
869                                                                          /*
870                                                                          * if (locking) { OpenJPAQuery oq =
871                                                                          * OpenJPAPersistence.cast(q); FetchPlan fetch =
872                                                                          * oq.getFetchPlan();
873                                                                          * fetch.setReadLockMode(LockModeType.WRITE);
874                                                                          * fetch.setLockTimeout(-1); // no limit }
875                                                                          */
876                                                                          actions = q.getResultList();
877                                                                          return actions;
878                                                                      }
879                                                                      catch (IllegalStateException e) {
880                                                                          throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
881                                                                      }
882                                                                  }
883                                                              });
884            return actions;
885        }
886    
887        /**
888         * Get coordinator action beans for given start date and end date
889         *
890         * @param startDate
891         * @param endDate
892         * @return list of coordinator action beans
893         * @throws StoreException
894         */
895        public List<CoordinatorActionBean> getCoordActionsForDates(final String jobId, final Date startDate,
896                final Date endDate)
897                throws StoreException {
898            List<CoordinatorActionBean> actions = doOperation("getCoordActionsForDates",
899                    new Callable<List<CoordinatorActionBean>>() {
900                        @SuppressWarnings("unchecked")
901                        public List<CoordinatorActionBean> call() throws StoreException {
902                            List<CoordinatorActionBean> actions;
903                            try {
904                                Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_DATES");
905                                q.setParameter("jobId", jobId);
906                                q.setParameter("startTime", new Timestamp(startDate.getTime()));
907                                q.setParameter("endTime", new Timestamp(endDate.getTime()));
908                                actions = q.getResultList();
909    
910                                List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
911                                for (CoordinatorActionBean a : actions) {
912                                    CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
913                                    actionList.add(aa);
914                                }
915                                return actionList;
916                            }
917                            catch (IllegalStateException e) {
918                                throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
919                            }
920                        }
921                    });
922            return actions;
923        }
924    
925        /**
926         * Get coordinator action bean for given date
927         *
928         * @param nominalTime
929         * @return CoordinatorActionBean
930         * @throws StoreException
931         */
932        public CoordinatorActionBean getCoordActionForNominalTime(final String jobId, final Date nominalTime)
933                throws StoreException {
934            CoordinatorActionBean action = doOperation("getCoordActionForNominalTime",
935                    new Callable<CoordinatorActionBean>() {
936                @SuppressWarnings("unchecked")
937                public CoordinatorActionBean call() throws StoreException {
938                    List<CoordinatorActionBean> actions;
939                    Query q = entityManager.createNamedQuery("GET_ACTION_FOR_NOMINALTIME");
940                    q.setParameter("jobId", jobId);
941                    q.setParameter("nominalTime", new Timestamp(nominalTime.getTime()));
942                    actions = q.getResultList();
943    
944                    CoordinatorActionBean action = null;
945                    if (actions.size() > 0) {
946                        action = actions.get(0);
947                    }
948                    else {
949                        throw new StoreException(ErrorCode.E0605, DateUtils.convertDateToString(nominalTime));
950                    }
951                    return getBeanForRunningCoordAction(action);
952                }
953            });
954            return action;
955        }
956    
957        public List<String> getRecoveryActionsGroupByJobId(final long checkAgeSecs) throws StoreException {
958            List<String> jobids = doOperation("getRecoveryActionsGroupByJobId", new Callable<List<String>>() {
959                @SuppressWarnings("unchecked")
960                public List<String> call() throws StoreException {
961                    List<String> jobids = new ArrayList<String>();
962                    try {
963                        Query q = entityManager.createNamedQuery("GET_READY_ACTIONS_GROUP_BY_JOBID");
964                        Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
965                        q.setParameter(1, ts);
966                        List<Object[]> list = q.getResultList();
967    
968                        for (Object[] arr : list) {
969                            if (arr != null && arr[0] != null) {
970                                jobids.add((String) arr[0]);
971                            }
972                        }
973    
974                        return jobids;
975                    }
976                    catch (IllegalStateException e) {
977                        throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
978                    }
979                }
980            });
981            return jobids;
982        }
983    }