001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.store;
019    
020    import java.sql.SQLException;
021    import java.sql.Timestamp;
022    import java.util.ArrayList;
023    import java.util.Date;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.concurrent.Callable;
027    
028    import javax.persistence.EntityManager;
029    import javax.persistence.Query;
030    
031    import org.apache.oozie.CoordinatorActionBean;
032    import org.apache.oozie.CoordinatorJobBean;
033    import org.apache.oozie.CoordinatorJobInfo;
034    import org.apache.oozie.ErrorCode;
035    import org.apache.oozie.client.Job.Status;
036    import org.apache.oozie.client.CoordinatorJob.Timeunit;
037    import org.apache.oozie.service.InstrumentationService;
038    import org.apache.oozie.service.Services;
039    import org.apache.oozie.util.DateUtils;
040    import org.apache.oozie.util.Instrumentation;
041    import org.apache.oozie.util.ParamChecker;
042    import org.apache.oozie.util.XLog;
043    import org.apache.oozie.workflow.WorkflowException;
044    import org.apache.openjpa.persistence.OpenJPAPersistence;
045    import org.apache.openjpa.persistence.OpenJPAQuery;
046    import org.apache.openjpa.persistence.jdbc.FetchDirection;
047    import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
048    import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
049    import org.apache.openjpa.persistence.jdbc.ResultSetType;
050    
051    /**
052     * DB Implementation of Coord Store
053     */
054    public class CoordinatorStore extends Store {
055        private final XLog log = XLog.getLog(getClass());
056    
057        private EntityManager entityManager;
058        private static final String INSTR_GROUP = "db";
059        public static final int LOCK_TIMEOUT = 50000;
060        private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
061    
062        public CoordinatorStore(boolean selectForUpdate) throws StoreException {
063            super();
064            entityManager = getEntityManager();
065        }
066    
067        public CoordinatorStore(Store store, boolean selectForUpdate) throws StoreException {
068            super(store);
069            entityManager = getEntityManager();
070        }
071    
072        /**
073         * Create a CoordJobBean. It also creates the process instance for the job.
074         *
075         * @param workflow workflow bean
076         * @throws StoreException
077         */
078    
079        public void insertCoordinatorJob(final CoordinatorJobBean coordinatorJob) throws StoreException {
080            ParamChecker.notNull(coordinatorJob, "coordinatorJob");
081    
082            doOperation("insertCoordinatorJob", new Callable<Void>() {
083                public Void call() throws StoreException {
084                    entityManager.persist(coordinatorJob);
085                    return null;
086                }
087            });
088        }
089    
090        /**
091         * Load the CoordinatorJob into a Bean and return it. Also load the Workflow Instance into the bean. And lock the
092         * Workflow depending on the locking parameter.
093         *
094         * @param id Job ID
095         * @param locking Flag for Table Lock
096         * @return CoordinatorJobBean
097         * @throws StoreException
098         */
099        public CoordinatorJobBean getCoordinatorJob(final String id, final boolean locking) throws StoreException {
100            ParamChecker.notEmpty(id, "CoordJobId");
101            CoordinatorJobBean cjBean = doOperation("getCoordinatorJob", new Callable<CoordinatorJobBean>() {
102                @SuppressWarnings("unchecked")
103                public CoordinatorJobBean call() throws StoreException {
104                    Query q = entityManager.createNamedQuery("GET_COORD_JOB");
105                    q.setParameter("id", id);
106                    /*
107                     * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q);
108                     * // q.setHint("openjpa.FetchPlan.ReadLockMode","WRITE");
109                     * FetchPlan fetch = oq.getFetchPlan();
110                     * fetch.setReadLockMode(LockModeType.WRITE);
111                     * fetch.setLockTimeout(-1); // 1 second }
112                     */
113                    List<CoordinatorJobBean> cjBeans = q.getResultList();
114    
115                    if (cjBeans.size() > 0) {
116                        return cjBeans.get(0);
117                    }
118                    else {
119                        throw new StoreException(ErrorCode.E0604, id);
120                    }
121                }
122            });
123    
124            cjBean.setStatus(cjBean.getStatus());
125            return cjBean;
126        }
127    
128        /**
129         * Get a list of Coordinator Jobs that should be materialized. Jobs with a 'last materialized time' older than the
130         * argument will be returned.
131         *
132         * @param d Date
133         * @return List of Coordinator Jobs that have a last materialized time older than input date
134         * @throws StoreException
135         */
136        public List<CoordinatorJobBean> getCoordinatorJobsToBeMaterialized(final Date d, final int limit)
137                throws StoreException {
138    
139            ParamChecker.notNull(d, "Coord Job Materialization Date");
140            List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsToBeMaterialized",
141                                                                                      new Callable<List<CoordinatorJobBean>>() {
142                                                                                          public List<CoordinatorJobBean> call() throws StoreException {
143    
144                                                                                              List<CoordinatorJobBean> cjBeans;
145                                                                                              List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>();
146                                                                                              try {
147                                                                                                  Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN");
148                                                                                                  q.setParameter("matTime", new Timestamp(d.getTime()));
149                                                                                                  if (limit > 0) {
150                                                                                                      q.setMaxResults(limit);
151                                                                                                  }
152                                                                                                  /*
153                                                                                                  OpenJPAQuery oq = OpenJPAPersistence.cast(q);
154                                                                                                  FetchPlan fetch = oq.getFetchPlan();
155                                                                                                  fetch.setReadLockMode(LockModeType.WRITE);
156                                                                                                  fetch.setLockTimeout(-1); // no limit
157                                                                                                  */
158                                                                                                  cjBeans = q.getResultList();
159                                                                                                  // copy results to a new object
160                                                                                                  for (CoordinatorJobBean j : cjBeans) {
161                                                                                                      jobList.add(j);
162                                                                                                  }
163                                                                                              }
164                                                                                              catch (IllegalStateException e) {
165                                                                                                  throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
166                                                                                              }
167                                                                                              return jobList;
168    
169                                                                                          }
170                                                                                      });
171            return cjBeans;
172        }
173    
174        /**
175         * A list of Coordinator Jobs that are matched with the status and have last materialized time' older than
176         * checkAgeSecs will be returned.
177         *
178         * @param checkAgeSecs Job age in Seconds
179         * @param status Coordinator Job Status
180         * @param limit Number of results to return
181         * @param locking Flag for Table Lock
182         * @return List of Coordinator Jobs that are matched with the parameters.
183         * @throws StoreException
184         */
185        public List<CoordinatorJobBean> getCoordinatorJobsOlderThanStatus(final long checkAgeSecs, final String status,
186                                                                          final int limit, final boolean locking) throws StoreException {
187    
188            ParamChecker.notNull(status, "Coord Job Status");
189            List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsOlderThanStatus",
190                                                                                      new Callable<List<CoordinatorJobBean>>() {
191                                                                                          public List<CoordinatorJobBean> call() throws StoreException {
192    
193                                                                                              List<CoordinatorJobBean> cjBeans;
194                                                                                              List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>();
195                                                                                              try {
196                                                                                                  Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN_STATUS");
197                                                                                                  Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
198                                                                                                  q.setParameter("lastModTime", ts);
199                                                                                                  q.setParameter("status", status);
200                                                                                                  if (limit > 0) {
201                                                                                                      q.setMaxResults(limit);
202                                                                                                  }
203                                                                                                  /*
204                                                                                                  * if (locking) { OpenJPAQuery oq =
205                                                                                                  * OpenJPAPersistence.cast(q); FetchPlan fetch =
206                                                                                                  * oq.getFetchPlan();
207                                                                                                  * fetch.setReadLockMode(LockModeType.WRITE);
208                                                                                                  * fetch.setLockTimeout(-1); // no limit }
209                                                                                                  */
210                                                                                                  cjBeans = q.getResultList();
211                                                                                                  for (CoordinatorJobBean j : cjBeans) {
212                                                                                                      jobList.add(j);
213                                                                                                  }
214                                                                                              }
215                                                                                              catch (Exception e) {
216                                                                                                  throw new StoreException(ErrorCode.E0603, e.getMessage(), e);
217                                                                                              }
218                                                                                              return jobList;
219    
220                                                                                          }
221                                                                                      });
222            return cjBeans;
223        }
224    
225        /**
226         * Load the CoordinatorAction into a Bean and return it.
227         *
228         * @param id action ID
229         * @return CoordinatorActionBean
230         * @throws StoreException
231         */
232        public CoordinatorActionBean getCoordinatorAction(final String id, final boolean locking) throws StoreException {
233            ParamChecker.notEmpty(id, "actionID");
234            CoordinatorActionBean caBean = doOperation("getCoordinatorAction", new Callable<CoordinatorActionBean>() {
235                public CoordinatorActionBean call() throws StoreException {
236                    Query q = entityManager.createNamedQuery("GET_COORD_ACTION");
237                    q.setParameter("id", id);
238                    OpenJPAQuery oq = OpenJPAPersistence.cast(q);
239                    /*
240                     * if (locking) { //q.setHint("openjpa.FetchPlan.ReadLockMode",
241                     * "WRITE"); FetchPlan fetch = oq.getFetchPlan();
242                     * fetch.setReadLockMode(LockModeType.WRITE);
243                     * fetch.setLockTimeout(-1); // no limit }
244                     */
245    
246                    CoordinatorActionBean action = null;
247                    List<CoordinatorActionBean> actions = q.getResultList();
248                    if (actions.size() > 0) {
249                        action = actions.get(0);
250                    }
251                    else {
252                        throw new StoreException(ErrorCode.E0605, id);
253                    }
254    
255                    /*
256                     * if (locking) return action; else
257                     */
258                    return getBeanForRunningCoordAction(action);
259                }
260            });
261            return caBean;
262        }
263    
264        /**
265         * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <=
266         * concurrency number. Sort returned actions based on execution order (FIFO, LIFO, LAST_ONLY)
267         *
268         * @param id job ID
269         * @param numResults number of results to return
270         * @param executionOrder execution for this job - FIFO, LIFO, LAST_ONLY
271         * @return List of CoordinatorActionBean
272         * @throws StoreException
273         */
274        public List<CoordinatorActionBean> getCoordinatorActionsForJob(final String id, final int numResults,
275                                                                       final String executionOrder) throws StoreException {
276            ParamChecker.notEmpty(id, "jobID");
277            List<CoordinatorActionBean> caBeans = doOperation("getCoordinatorActionsForJob",
278                                                              new Callable<List<CoordinatorActionBean>>() {
279                                                                  public List<CoordinatorActionBean> call() throws StoreException {
280    
281                                                                      List<CoordinatorActionBean> caBeans;
282                                                                      Query q;
283                                                                      // check if executionOrder is FIFO, LIFO, or LAST_ONLY
284                                                                      if (executionOrder.equalsIgnoreCase("FIFO")) {
285                                                                          q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_FIFO");
286                                                                      }
287                                                                      else {
288                                                                          q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_LIFO");
289                                                                      }
290                                                                      q.setParameter("jobId", id);
291                                                                      // if executionOrder is LAST_ONLY, only retrieve first
292                                                                      // record in LIFO,
293                                                                      // otherwise, use numResults if it is positive.
294                                                                      if (executionOrder.equalsIgnoreCase("LAST_ONLY")) {
295                                                                          q.setMaxResults(1);
296                                                                      }
297                                                                      else {
298                                                                          if (numResults > 0) {
299                                                                              q.setMaxResults(numResults);
300                                                                          }
301                                                                      }
302                                                                      caBeans = q.getResultList();
303                                                                      return caBeans;
304                                                                  }
305                                                              });
306            return caBeans;
307        }
308    
309        /**
310         * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <=
311         * concurrency number.
312         *
313         * @param id job ID
314         * @return Number of running actions
315         * @throws StoreException
316         */
317        public int getCoordinatorRunningActionsCount(final String id) throws StoreException {
318            ParamChecker.notEmpty(id, "jobID");
319            Integer cnt = doOperation("getCoordinatorRunningActionsCount", new Callable<Integer>() {
320                public Integer call() throws SQLException {
321    
322                    Query q = entityManager.createNamedQuery("GET_COORD_RUNNING_ACTIONS_COUNT");
323    
324                    q.setParameter("jobId", id);
325                    Long count = (Long) q.getSingleResult();
326                    return Integer.valueOf(count.intValue());
327                }
328            });
329            return cnt.intValue();
330        }
331    
332        /**
333         * Create a new Action record in the ACTIONS table with the given Bean.
334         *
335         * @param action WorkflowActionBean
336         * @throws StoreException If the action is already present
337         */
338        public void insertCoordinatorAction(final CoordinatorActionBean action) throws StoreException {
339            ParamChecker.notNull(action, "CoordinatorActionBean");
340            doOperation("insertCoordinatorAction", new Callable<Void>() {
341                public Void call() throws StoreException {
342                    entityManager.persist(action);
343                    return null;
344                }
345            });
346        }
347    
348        /**
349         * Update the given action bean to DB.
350         *
351         * @param action Action Bean
352         * @throws StoreException if action doesn't exist
353         */
354        public void updateCoordinatorAction(final CoordinatorActionBean action) throws StoreException {
355            ParamChecker.notNull(action, "CoordinatorActionBean");
356            doOperation("updateCoordinatorAction", new Callable<Void>() {
357                public Void call() throws StoreException {
358                    Query q = entityManager.createNamedQuery("UPDATE_COORD_ACTION");
359                    q.setParameter("id", action.getId());
360                    setActionQueryParameters(action, q);
361                    q.executeUpdate();
362                    return null;
363                }
364            });
365        }
366    
367        /**
368         * Update the given action bean to DB.
369         *
370         * @param action Action Bean
371         * @throws StoreException if action doesn't exist
372         */
373        public void updateCoordActionMin(final CoordinatorActionBean action) throws StoreException {
374            ParamChecker.notNull(action, "CoordinatorActionBean");
375            doOperation("updateCoordinatorAction", new Callable<Void>() {
376                public Void call() throws StoreException {
377                    Query q = entityManager.createNamedQuery("UPDATE_COORD_ACTION_MIN");
378                    q.setParameter("id", action.getId());
379                    q.setParameter("missingDependencies", action.getMissingDependencies());
380                    q.setParameter("lastModifiedTime", new Date());
381                    q.setParameter("status", action.getStatus().toString());
382                    q.setParameter("actionXml", action.getActionXml());
383                    q.executeUpdate();
384                    return null;
385                }
386            });
387        }
388    
389        /**
390         * Update the given coordinator job bean to DB.
391         *
392         * @param jobbean Coordinator Job Bean
393         * @throws StoreException if action doesn't exist
394         */
395        public void updateCoordinatorJob(final CoordinatorJobBean job) throws StoreException {
396            ParamChecker.notNull(job, "CoordinatorJobBean");
397            doOperation("updateJob", new Callable<Void>() {
398                public Void call() throws StoreException {
399                    Query q = entityManager.createNamedQuery("UPDATE_COORD_JOB");
400                    q.setParameter("id", job.getId());
401                    setJobQueryParameters(job, q);
402                    q.executeUpdate();
403                    return null;
404                }
405            });
406        }
407    
408        public void updateCoordinatorJobStatus(final CoordinatorJobBean job) throws StoreException {
409            ParamChecker.notNull(job, "CoordinatorJobBean");
410            doOperation("updateJobStatus", new Callable<Void>() {
411                public Void call() throws StoreException {
412                    Query q = entityManager.createNamedQuery("UPDATE_COORD_JOB_STATUS");
413                    q.setParameter("id", job.getId());
414                    q.setParameter("status", job.getStatus().toString());
415                    q.setParameter("lastModifiedTime", new Date());
416                    q.executeUpdate();
417                    return null;
418                }
419            });
420        }
421    
422        private <V> V doOperation(String name, Callable<V> command) throws StoreException {
423            try {
424                Instrumentation.Cron cron = new Instrumentation.Cron();
425                cron.start();
426                V retVal;
427                try {
428                    retVal = command.call();
429                }
430                finally {
431                    cron.stop();
432                }
433                Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron);
434                return retVal;
435            }
436            catch (StoreException ex) {
437                throw ex;
438            }
439            catch (SQLException ex) {
440                throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex);
441            }
442            catch (Exception e) {
443                throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e);
444            }
445        }
446    
447        private void setJobQueryParameters(CoordinatorJobBean jBean, Query q) {
448            q.setParameter("appName", jBean.getAppName());
449            q.setParameter("appPath", jBean.getAppPath());
450            q.setParameter("concurrency", jBean.getConcurrency());
451            q.setParameter("conf", jBean.getConf());
452            q.setParameter("externalId", jBean.getExternalId());
453            q.setParameter("frequency", jBean.getFrequency());
454            q.setParameter("lastActionNumber", jBean.getLastActionNumber());
455            q.setParameter("timeOut", jBean.getTimeout());
456            q.setParameter("timeZone", jBean.getTimeZone());
457            q.setParameter("createdTime", jBean.getCreatedTimestamp());
458            q.setParameter("endTime", jBean.getEndTimestamp());
459            q.setParameter("execution", jBean.getExecution());
460            q.setParameter("jobXml", jBean.getJobXml());
461            q.setParameter("lastAction", jBean.getLastActionTimestamp());
462            q.setParameter("lastModifiedTime", new Date());
463            q.setParameter("nextMaterializedTime", jBean.getNextMaterializedTimestamp());
464            q.setParameter("origJobXml", jBean.getOrigJobXml());
465            q.setParameter("slaXml", jBean.getSlaXml());
466            q.setParameter("startTime", jBean.getStartTimestamp());
467            q.setParameter("status", jBean.getStatus().toString());
468            q.setParameter("timeUnit", jBean.getTimeUnitStr());
469        }
470    
471        private void setActionQueryParameters(CoordinatorActionBean aBean, Query q) {
472            q.setParameter("actionNumber", aBean.getActionNumber());
473            q.setParameter("actionXml", aBean.getActionXml());
474            q.setParameter("consoleUrl", aBean.getConsoleUrl());
475            q.setParameter("createdConf", aBean.getCreatedConf());
476            q.setParameter("errorCode", aBean.getErrorCode());
477            q.setParameter("errorMessage", aBean.getErrorMessage());
478            q.setParameter("externalStatus", aBean.getExternalStatus());
479            q.setParameter("missingDependencies", aBean.getMissingDependencies());
480            q.setParameter("runConf", aBean.getRunConf());
481            q.setParameter("timeOut", aBean.getTimeOut());
482            q.setParameter("trackerUri", aBean.getTrackerUri());
483            q.setParameter("type", aBean.getType());
484            q.setParameter("createdTime", aBean.getCreatedTimestamp());
485            q.setParameter("externalId", aBean.getExternalId());
486            q.setParameter("jobId", aBean.getJobId());
487            q.setParameter("lastModifiedTime", new Date());
488            q.setParameter("nominalTime", aBean.getNominalTimestamp());
489            q.setParameter("slaXml", aBean.getSlaXml());
490            q.setParameter("status", aBean.getStatus().toString());
491        }
492    
493    
494        /**
495         * Purge the coordinators completed older than given days.
496         *
497         * @param olderThanDays number of days for which to preserve the coordinators
498         * @param limit maximum number of coordinator jobs to be purged
499         * @throws StoreException
500         */
501        public void purge(final long olderThanDays, final int limit) throws StoreException {
502            doOperation("coord-purge", new Callable<Void>() {
503                public Void call() throws SQLException, StoreException, WorkflowException {
504                    Timestamp lastModTm = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS));
505                    Query jobQ = entityManager.createNamedQuery("GET_COMPLETED_COORD_JOBS_OLDER_THAN_STATUS");
506                    jobQ.setParameter("lastModTime", lastModTm);
507                    jobQ.setMaxResults(limit);
508                    List<CoordinatorJobBean> coordJobs = jobQ.getResultList();
509    
510                    int actionDeleted = 0;
511                    if (coordJobs.size() != 0) {
512                        for (CoordinatorJobBean coord : coordJobs) {
513                            String jobId = coord.getId();
514                            entityManager.remove(coord);
515                            Query g = entityManager.createNamedQuery("DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR");
516                            g.setParameter("jobId", jobId);
517                            actionDeleted += g.executeUpdate();
518                        }
519                    }
520    
521                    XLog.getLog(getClass()).debug("ENDED Coord Purge deleted jobs :" + coordJobs.size() + " and actions " + actionDeleted);
522                    return null;
523                }
524            });
525        }
526    
527        public void commit() throws StoreException {
528        }
529    
530        public void close() throws StoreException {
531        }
532    
533        public CoordinatorJobBean getCoordinatorJobs(String id) {
534            // TODO Auto-generated method stub
535            return null;
536        }
537    
538        public CoordinatorJobInfo getCoordinatorInfo(final Map<String, List<String>> filter, final int start, final int len)
539                throws StoreException {
540    
541            CoordinatorJobInfo coordJobInfo = doOperation("getCoordinatorJobInfo", new Callable<CoordinatorJobInfo>() {
542                public CoordinatorJobInfo call() throws SQLException, StoreException {
543                    List<String> orArray = new ArrayList<String>();
544                    List<String> colArray = new ArrayList<String>();
545                    List<String> valArray = new ArrayList<String>();
546                    StringBuilder sb = new StringBuilder("");
547    
548                    StoreStatusFilter.filter(filter, orArray, colArray, valArray, sb, StoreStatusFilter.coordSeletStr,
549                                             StoreStatusFilter.coordCountStr);
550    
551                    int realLen = 0;
552    
553                    Query q = null;
554                    Query qTotal = null;
555                    if (orArray.size() == 0) {
556                        q = entityManager.createNamedQuery("GET_COORD_JOBS_COLUMNS");
557                        q.setFirstResult(start - 1);
558                        q.setMaxResults(len);
559                        qTotal = entityManager.createNamedQuery("GET_COORD_JOBS_COUNT");
560                    }
561                    else {
562                        StringBuilder sbTotal = new StringBuilder(sb);
563                        sb.append(" order by w.createdTimestamp desc ");
564                        XLog.getLog(getClass()).debug("Created String is **** " + sb.toString());
565                        q = entityManager.createQuery(sb.toString());
566                        q.setFirstResult(start - 1);
567                        q.setMaxResults(len);
568                        qTotal = entityManager.createQuery(sbTotal.toString().replace(StoreStatusFilter.coordSeletStr,
569                                                                                      StoreStatusFilter.coordCountStr));
570                    }
571    
572                    for (int i = 0; i < orArray.size(); i++) {
573                        q.setParameter(colArray.get(i), valArray.get(i));
574                        qTotal.setParameter(colArray.get(i), valArray.get(i));
575                    }
576    
577                    OpenJPAQuery kq = OpenJPAPersistence.cast(q);
578                    JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
579                    fetch.setFetchBatchSize(20);
580                    fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
581                    fetch.setFetchDirection(FetchDirection.FORWARD);
582                    fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
583                    List<?> resultList = q.getResultList();
584                    List<Object[]> objectArrList = (List<Object[]>) resultList;
585                    List<CoordinatorJobBean> coordBeansList = new ArrayList<CoordinatorJobBean>();
586    
587                    for (Object[] arr : objectArrList) {
588                        CoordinatorJobBean ww = getBeanForCoordinatorJobFromArray(arr);
589                        coordBeansList.add(ww);
590                    }
591    
592                    realLen = ((Long) qTotal.getSingleResult()).intValue();
593    
594                    return new CoordinatorJobInfo(coordBeansList, start, len, realLen);
595                }
596            });
597            return coordJobInfo;
598        }
599    
600        private CoordinatorJobBean getBeanForCoordinatorJobFromArray(Object[] arr) {
601            CoordinatorJobBean bean = new CoordinatorJobBean();
602            bean.setId((String) arr[0]);
603            if (arr[1] != null) {
604                bean.setAppName((String) arr[1]);
605            }
606            if (arr[2] != null) {
607                bean.setStatus(Status.valueOf((String) arr[2]));
608            }
609            if (arr[3] != null) {
610                bean.setUser((String) arr[3]);
611            }
612            if (arr[4] != null) {
613                bean.setGroup((String) arr[4]);
614            }
615            if (arr[5] != null) {
616                bean.setStartTime((Timestamp) arr[5]);
617            }
618            if (arr[6] != null) {
619                bean.setEndTime((Timestamp) arr[6]);
620            }
621            if (arr[7] != null) {
622                bean.setAppPath((String) arr[7]);
623            }
624            if (arr[8] != null) {
625                bean.setConcurrency(((Integer) arr[8]).intValue());
626            }
627            if (arr[9] != null) {
628                bean.setFrequency((String) arr[9]);
629            }
630            if (arr[10] != null) {
631                bean.setLastActionTime((Timestamp) arr[10]);
632            }
633            if (arr[11] != null) {
634                bean.setNextMaterializedTime((Timestamp) arr[11]);
635            }
636            if (arr[13] != null) {
637                bean.setTimeUnit(Timeunit.valueOf((String) arr[13]));
638            }
639            if (arr[14] != null) {
640                bean.setTimeZone((String) arr[14]);
641            }
642            if (arr[15] != null) {
643                bean.setTimeout((Integer) arr[15]);
644            }
645            return bean;
646        }
647    
648        /**
649         * Loads all actions for the given Coordinator job.
650         *
651         * @param jobId coordinator job id
652         * @param locking true if Actions are to be locked
653         * @return A List of CoordinatorActionBean
654         * @throws StoreException
655         */
656        public Integer getActionsForCoordinatorJob(final String jobId, final boolean locking)
657                throws StoreException {
658            ParamChecker.notEmpty(jobId, "CoordinatorJobID");
659            Integer actionsCount = doOperation("getActionsForCoordinatorJob",
660                                                              new Callable<Integer>() {
661                                                                  @SuppressWarnings("unchecked")
662                                                                  public Integer call() throws StoreException {
663                                                                      List<CoordinatorActionBean> actions;
664                                                                      List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
665                                                                      try {
666                                                                          Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB");
667                                                                          q.setParameter("jobId", jobId);
668                                                                          /*
669                                                                          * if (locking) { //
670                                                                          * q.setHint("openjpa.FetchPlan.ReadLockMode", //
671                                                                          * "READ"); OpenJPAQuery oq =
672                                                                          * OpenJPAPersistence.cast(q); JDBCFetchPlan fetch =
673                                                                          * (JDBCFetchPlan) oq.getFetchPlan();
674                                                                          * fetch.setReadLockMode(LockModeType.WRITE);
675                                                                          * fetch.setLockTimeout(-1); // 1 second }
676                                                                          */
677                                                                          Long count = (Long) q.getSingleResult();
678                                                                          return Integer.valueOf(count.intValue());
679                                                                          /*actions = q.getResultList();
680                                                                          for (CoordinatorActionBean a : actions) {
681                                                                              CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
682                                                                              actionList.add(aa);
683                                                                          }*/
684                                                                      }
685                                                                      catch (IllegalStateException e) {
686                                                                          throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
687                                                                      }
688                                                                      /*
689                                                                      * if (locking) { return actions; } else {
690                                                                      */
691    
692                                                                      // }
693                                                                  }
694                                                              });
695            return actionsCount;
696        }
697    
698        /**
699         * Loads given number of actions for the given Coordinator job.
700         *
701         * @param jobId coordinator job id
702         * @param start offset for select statement
703         * @param len number of Workflow Actions to be returned
704         * @return A List of CoordinatorActionBean
705         * @throws StoreException
706         */
707        public List<CoordinatorActionBean> getActionsSubsetForCoordinatorJob(final String jobId, final int start,
708                                                                             final int len) throws StoreException {
709            ParamChecker.notEmpty(jobId, "CoordinatorJobID");
710            List<CoordinatorActionBean> actions = doOperation("getActionsForCoordinatorJob",
711                                                              new Callable<List<CoordinatorActionBean>>() {
712                                                                  @SuppressWarnings("unchecked")
713                                                                  public List<CoordinatorActionBean> call() throws StoreException {
714                                                                      List<CoordinatorActionBean> actions;
715                                                                      List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
716                                                                      try {
717                                                                          Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB");
718                                                                          q.setParameter("jobId", jobId);
719                                                                          q.setFirstResult(start - 1);
720                                                                          q.setMaxResults(len);
721                                                                          actions = q.getResultList();
722                                                                          for (CoordinatorActionBean a : actions) {
723                                                                              CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
724                                                                              actionList.add(aa);
725                                                                          }
726                                                                      }
727                                                                      catch (IllegalStateException e) {
728                                                                          throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
729                                                                      }
730                                                                      return actionList;
731                                                                  }
732                                                              });
733            return actions;
734        }
735    
736        protected CoordinatorActionBean getBeanForRunningCoordAction(CoordinatorActionBean a) {
737            if (a != null) {
738                CoordinatorActionBean action = new CoordinatorActionBean();
739                action.setId(a.getId());
740                action.setActionNumber(a.getActionNumber());
741                action.setActionXml(a.getActionXml());
742                action.setConsoleUrl(a.getConsoleUrl());
743                action.setCreatedConf(a.getCreatedConf());
744                //action.setErrorCode(a.getErrorCode());
745                //action.setErrorMessage(a.getErrorMessage());
746                action.setExternalStatus(a.getExternalStatus());
747                action.setMissingDependencies(a.getMissingDependencies());
748                action.setRunConf(a.getRunConf());
749                action.setTimeOut(a.getTimeOut());
750                action.setTrackerUri(a.getTrackerUri());
751                action.setType(a.getType());
752                action.setCreatedTime(a.getCreatedTime());
753                action.setExternalId(a.getExternalId());
754                action.setJobId(a.getJobId());
755                action.setLastModifiedTime(a.getLastModifiedTime());
756                action.setNominalTime(a.getNominalTime());
757                action.setSlaXml(a.getSlaXml());
758                action.setStatus(a.getStatus());
759                return action;
760            }
761            return null;
762        }
763    
764        public CoordinatorActionBean getAction(String id, boolean b) {
765            return null;
766        }
767    
768    
769        public List<CoordinatorActionBean> getRunningActionsForCoordinatorJob(final String jobId, final boolean locking)
770                throws StoreException {
771            ParamChecker.notEmpty(jobId, "CoordinatorJobID");
772            List<CoordinatorActionBean> actions = doOperation("getRunningActionsForCoordinatorJob",
773                                                              new Callable<List<CoordinatorActionBean>>() {
774                                                                  @SuppressWarnings("unchecked")
775                                                                  public List<CoordinatorActionBean> call() throws StoreException {
776                                                                      List<CoordinatorActionBean> actions;
777                                                                      try {
778                                                                          Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_FOR_COORD_JOB");
779                                                                          q.setParameter("jobId", jobId);
780                                                                          /*
781                                                                          * if (locking) {
782                                                                          * q.setHint("openjpa.FetchPlan.ReadLockMode",
783                                                                          * "READ"); OpenJPAQuery oq =
784                                                                          * OpenJPAPersistence.cast(q); FetchPlan fetch =
785                                                                          * oq.getFetchPlan();
786                                                                          * fetch.setReadLockMode(LockModeType.WRITE);
787                                                                          * fetch.setLockTimeout(-1); // no limit }
788                                                                          */
789                                                                          actions = q.getResultList();
790                                                                          return actions;
791                                                                      }
792                                                                      catch (IllegalStateException e) {
793                                                                          throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
794                                                                      }
795                                                                  }
796                                                              });
797            return actions;
798        }
799    
800        public List<CoordinatorActionBean> getRunningActionsOlderThan(final long checkAgeSecs, final boolean locking)
801                throws StoreException {
802            List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan",
803                                                              new Callable<List<CoordinatorActionBean>>() {
804                                                                  @SuppressWarnings("unchecked")
805                                                                  public List<CoordinatorActionBean> call() throws StoreException {
806                                                                      List<CoordinatorActionBean> actions;
807                                                                      Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
808                                                                      try {
809                                                                          Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_OLDER_THAN");
810                                                                          q.setParameter("lastModifiedTime", ts);
811                                                                          /*
812                                                                          * if (locking) { OpenJPAQuery oq =
813                                                                          * OpenJPAPersistence.cast(q); FetchPlan fetch =
814                                                                          * oq.getFetchPlan();
815                                                                          * fetch.setReadLockMode(LockModeType.WRITE);
816                                                                          * fetch.setLockTimeout(-1); // no limit }
817                                                                          */
818                                                                          actions = q.getResultList();
819                                                                          return actions;
820                                                                      }
821                                                                      catch (IllegalStateException e) {
822                                                                          throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
823                                                                      }
824                                                                  }
825                                                              });
826            return actions;
827        }
828    
829        public List<CoordinatorActionBean> getRecoveryActionsOlderThan(final long checkAgeSecs, final boolean locking)
830                throws StoreException {
831            List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan",
832                                                              new Callable<List<CoordinatorActionBean>>() {
833                                                                  @SuppressWarnings("unchecked")
834                                                                  public List<CoordinatorActionBean> call() throws StoreException {
835                                                                      List<CoordinatorActionBean> actions;
836                                                                      try {
837                                                                          Query q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN");
838                                                                          Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
839                                                                          q.setParameter("lastModifiedTime", ts);
840                                                                          /*
841                                                                          * if (locking) { OpenJPAQuery oq =
842                                                                          * OpenJPAPersistence.cast(q); FetchPlan fetch =
843                                                                          * oq.getFetchPlan();
844                                                                          * fetch.setReadLockMode(LockModeType.WRITE);
845                                                                          * fetch.setLockTimeout(-1); // no limit }
846                                                                          */
847                                                                          actions = q.getResultList();
848                                                                          return actions;
849                                                                      }
850                                                                      catch (IllegalStateException e) {
851                                                                          throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
852                                                                      }
853                                                                  }
854                                                              });
855            return actions;
856        }
857    
858        /**
859         * Get coordinator action beans for given start date and end date
860         *
861         * @param startDate
862         * @param endDate
863         * @return list of coordinator action beans
864         * @throws StoreException
865         */
866        public List<CoordinatorActionBean> getCoordActionsForDates(final String jobId, final Date startDate,
867                final Date endDate)
868                throws StoreException {
869            List<CoordinatorActionBean> actions = doOperation("getCoordActionsForDates",
870                    new Callable<List<CoordinatorActionBean>>() {
871                        @SuppressWarnings("unchecked")
872                        public List<CoordinatorActionBean> call() throws StoreException {
873                            List<CoordinatorActionBean> actions;
874                            try {
875                                Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_DATES");
876                                q.setParameter("jobId", jobId);
877                                q.setParameter("startTime", new Timestamp(startDate.getTime()));
878                                q.setParameter("endTime", new Timestamp(endDate.getTime()));
879                                actions = q.getResultList();
880    
881                                List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
882                                for (CoordinatorActionBean a : actions) {
883                                    CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
884                                    actionList.add(aa);
885                                }
886                                return actionList;
887                            }
888                            catch (IllegalStateException e) {
889                                throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
890                            }
891                        }
892                    });
893            return actions;
894        }
895    
896        /**
897         * Get coordinator action bean for given date
898         *
899         * @param nominalTime
900         * @return CoordinatorActionBean
901         * @throws StoreException
902         */
903        public CoordinatorActionBean getCoordActionForNominalTime(final String jobId, final Date nominalTime)
904                throws StoreException {
905            CoordinatorActionBean action = doOperation("getCoordActionForNominalTime",
906                    new Callable<CoordinatorActionBean>() {
907                @SuppressWarnings("unchecked")
908                public CoordinatorActionBean call() throws StoreException {
909                    List<CoordinatorActionBean> actions;
910                    Query q = entityManager.createNamedQuery("GET_ACTION_FOR_NOMINALTIME");
911                    q.setParameter("jobId", jobId);
912                    q.setParameter("nominalTime", new Timestamp(nominalTime.getTime()));
913                    actions = q.getResultList();
914    
915                    CoordinatorActionBean action = null;
916                    if (actions.size() > 0) {
917                        action = actions.get(0);
918                    }
919                    else {
920                        throw new StoreException(ErrorCode.E0605, DateUtils.formatDateOozieTZ(nominalTime));
921                    }
922                    return getBeanForRunningCoordAction(action);
923                }
924            });
925            return action;
926        }
927    
928        public List<String> getRecoveryActionsGroupByJobId(final long checkAgeSecs) throws StoreException {
929            List<String> jobids = doOperation("getRecoveryActionsGroupByJobId", new Callable<List<String>>() {
930                @SuppressWarnings("unchecked")
931                public List<String> call() throws StoreException {
932                    List<String> jobids = new ArrayList<String>();
933                    try {
934                        Query q = entityManager.createNamedQuery("GET_READY_ACTIONS_GROUP_BY_JOBID");
935                        Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
936                        q.setParameter(1, ts);
937                        List<Object[]> list = q.getResultList();
938    
939                        for (Object[] arr : list) {
940                            if (arr != null && arr[0] != null) {
941                                jobids.add((String) arr[0]);
942                            }
943                        }
944    
945                        return jobids;
946                    }
947                    catch (IllegalStateException e) {
948                        throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
949                    }
950                }
951            });
952            return jobids;
953        }
954    }