001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.store;
019
020import java.sql.SQLException;
021import java.sql.Timestamp;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.Callable;
027
028import javax.persistence.EntityManager;
029import javax.persistence.Query;
030
031import org.apache.oozie.CoordinatorActionBean;
032import org.apache.oozie.CoordinatorJobBean;
033import org.apache.oozie.CoordinatorJobInfo;
034import org.apache.oozie.ErrorCode;
035import org.apache.oozie.client.Job.Status;
036import org.apache.oozie.client.CoordinatorJob.Timeunit;
037import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
038import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
039import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
040import org.apache.oozie.executor.jpa.JPAExecutorException;
041import org.apache.oozie.service.InstrumentationService;
042import org.apache.oozie.service.Services;
043import org.apache.oozie.util.DateUtils;
044import org.apache.oozie.util.Instrumentation;
045import org.apache.oozie.util.ParamChecker;
046import org.apache.oozie.util.XLog;
047import org.apache.oozie.workflow.WorkflowException;
048import org.apache.openjpa.persistence.OpenJPAPersistence;
049import org.apache.openjpa.persistence.OpenJPAQuery;
050import org.apache.openjpa.persistence.jdbc.FetchDirection;
051import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
052import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
053import org.apache.openjpa.persistence.jdbc.ResultSetType;
054
055/**
056 * DB Implementation of Coord Store
057 */
058public class CoordinatorStore extends Store {
059    private final XLog log = XLog.getLog(getClass());
060
061    private EntityManager entityManager;
062    private static final String INSTR_GROUP = "db";
063    public static final int LOCK_TIMEOUT = 50000;
064    private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
065
066    public CoordinatorStore(boolean selectForUpdate) throws StoreException {
067        super();
068        entityManager = getEntityManager();
069    }
070
071    public CoordinatorStore(Store store, boolean selectForUpdate) throws StoreException {
072        super(store);
073        entityManager = getEntityManager();
074    }
075
076    /**
077     * Create a CoordJobBean. It also creates the process instance for the job.
078     *
079     * @param workflow workflow bean
080     * @throws StoreException
081     */
082
083    public void insertCoordinatorJob(final CoordinatorJobBean coordinatorJob) throws StoreException {
084        ParamChecker.notNull(coordinatorJob, "coordinatorJob");
085
086        doOperation("insertCoordinatorJob", new Callable<Void>() {
087            public Void call() throws StoreException {
088                entityManager.persist(coordinatorJob);
089                return null;
090            }
091        });
092    }
093
094    /**
095     * Load the CoordinatorJob into a Bean and return it. Also load the Workflow Instance into the bean. And lock the
096     * Workflow depending on the locking parameter.
097     *
098     * @param id Job ID
099     * @param locking Flag for Table Lock
100     * @return CoordinatorJobBean
101     * @throws StoreException
102     */
103    public CoordinatorJobBean getCoordinatorJob(final String id, final boolean locking) throws StoreException {
104        ParamChecker.notEmpty(id, "CoordJobId");
105        CoordinatorJobBean cjBean = doOperation("getCoordinatorJob", new Callable<CoordinatorJobBean>() {
106            @SuppressWarnings("unchecked")
107            public CoordinatorJobBean call() throws StoreException {
108                Query q = entityManager.createNamedQuery("GET_COORD_JOB");
109                q.setParameter("id", id);
110                /*
111                 * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q);
112                 * // q.setHint("openjpa.FetchPlan.ReadLockMode","WRITE");
113                 * FetchPlan fetch = oq.getFetchPlan();
114                 * fetch.setReadLockMode(LockModeType.WRITE);
115                 * fetch.setLockTimeout(-1); // 1 second }
116                 */
117                List<CoordinatorJobBean> cjBeans = q.getResultList();
118
119                if (cjBeans.size() > 0) {
120                    return cjBeans.get(0);
121                }
122                else {
123                    throw new StoreException(ErrorCode.E0604, id);
124                }
125            }
126        });
127
128        cjBean.setStatus(cjBean.getStatus());
129        return cjBean;
130    }
131
132    /**
133     * Get a list of Coordinator Jobs that should be materialized. Jobs with a 'last materialized time' older than the
134     * argument will be returned.
135     *
136     * @param d Date
137     * @return List of Coordinator Jobs that have a last materialized time older than input date
138     * @throws StoreException
139     */
140    public List<CoordinatorJobBean> getCoordinatorJobsToBeMaterialized(final Date d, final int limit)
141            throws StoreException {
142
143        ParamChecker.notNull(d, "Coord Job Materialization Date");
144        List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsToBeMaterialized",
145                                                                                  new Callable<List<CoordinatorJobBean>>() {
146                                                                                      public List<CoordinatorJobBean> call() throws StoreException {
147
148                                                                                          List<CoordinatorJobBean> cjBeans;
149                                                                                          List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>();
150                                                                                          try {
151                                                                                              Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN");
152                                                                                              q.setParameter("matTime", new Timestamp(d.getTime()));
153                                                                                              if (limit > 0) {
154                                                                                                  q.setMaxResults(limit);
155                                                                                              }
156                                                                                              /*
157                                                                                              OpenJPAQuery oq = OpenJPAPersistence.cast(q);
158                                                                                              FetchPlan fetch = oq.getFetchPlan();
159                                                                                              fetch.setReadLockMode(LockModeType.WRITE);
160                                                                                              fetch.setLockTimeout(-1); // no limit
161                                                                                              */
162                                                                                              cjBeans = q.getResultList();
163                                                                                              // copy results to a new object
164                                                                                              for (CoordinatorJobBean j : cjBeans) {
165                                                                                                  jobList.add(j);
166                                                                                              }
167                                                                                          }
168                                                                                          catch (IllegalStateException e) {
169                                                                                              throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
170                                                                                          }
171                                                                                          return jobList;
172
173                                                                                      }
174                                                                                  });
175        return cjBeans;
176    }
177
178    /**
179     * A list of Coordinator Jobs that are matched with the status and have last materialized time' older than
180     * checkAgeSecs will be returned.
181     *
182     * @param checkAgeSecs Job age in Seconds
183     * @param status Coordinator Job Status
184     * @param limit Number of results to return
185     * @param locking Flag for Table Lock
186     * @return List of Coordinator Jobs that are matched with the parameters.
187     * @throws StoreException
188     */
189    public List<CoordinatorJobBean> getCoordinatorJobsOlderThanStatus(final long checkAgeSecs, final String status,
190                                                                      final int limit, final boolean locking) throws StoreException {
191
192        ParamChecker.notNull(status, "Coord Job Status");
193        List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsOlderThanStatus",
194                                                                                  new Callable<List<CoordinatorJobBean>>() {
195                                                                                      public List<CoordinatorJobBean> call() throws StoreException {
196
197                                                                                          List<CoordinatorJobBean> cjBeans;
198                                                                                          List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>();
199                                                                                          try {
200                                                                                              Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN_STATUS");
201                                                                                              Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
202                                                                                              q.setParameter("lastModTime", ts);
203                                                                                              q.setParameter("status", status);
204                                                                                              if (limit > 0) {
205                                                                                                  q.setMaxResults(limit);
206                                                                                              }
207                                                                                              /*
208                                                                                              * if (locking) { OpenJPAQuery oq =
209                                                                                              * OpenJPAPersistence.cast(q); FetchPlan fetch =
210                                                                                              * oq.getFetchPlan();
211                                                                                              * fetch.setReadLockMode(LockModeType.WRITE);
212                                                                                              * fetch.setLockTimeout(-1); // no limit }
213                                                                                              */
214                                                                                              cjBeans = q.getResultList();
215                                                                                              for (CoordinatorJobBean j : cjBeans) {
216                                                                                                  jobList.add(j);
217                                                                                              }
218                                                                                          }
219                                                                                          catch (Exception e) {
220                                                                                              throw new StoreException(ErrorCode.E0603, e.getMessage(), e);
221                                                                                          }
222                                                                                          return jobList;
223
224                                                                                      }
225                                                                                  });
226        return cjBeans;
227    }
228
229    /**
230     * Load the CoordinatorAction into a Bean and return it.
231     *
232     * @param id action ID
233     * @return CoordinatorActionBean
234     * @throws StoreException
235     */
236    public CoordinatorActionBean getCoordinatorAction(final String id, final boolean locking) throws StoreException {
237        ParamChecker.notEmpty(id, "actionID");
238        CoordinatorActionBean caBean = doOperation("getCoordinatorAction", new Callable<CoordinatorActionBean>() {
239            public CoordinatorActionBean call() throws StoreException {
240                Query q = entityManager.createNamedQuery("GET_COORD_ACTION");
241                q.setParameter("id", id);
242                OpenJPAQuery oq = OpenJPAPersistence.cast(q);
243                /*
244                 * if (locking) { //q.setHint("openjpa.FetchPlan.ReadLockMode",
245                 * "WRITE"); FetchPlan fetch = oq.getFetchPlan();
246                 * fetch.setReadLockMode(LockModeType.WRITE);
247                 * fetch.setLockTimeout(-1); // no limit }
248                 */
249
250                CoordinatorActionBean action = null;
251                List<CoordinatorActionBean> actions = q.getResultList();
252                if (actions.size() > 0) {
253                    action = actions.get(0);
254                }
255                else {
256                    throw new StoreException(ErrorCode.E0605, id);
257                }
258
259                /*
260                 * if (locking) return action; else
261                 */
262                return getBeanForRunningCoordAction(action);
263            }
264        });
265        return caBean;
266    }
267
268    /**
269     * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <=
270     * concurrency number. Sort returned actions based on execution order (FIFO, LIFO, LAST_ONLY, NONE)
271     *
272     * @param id job ID
273     * @param numResults number of results to return
274     * @param executionOrder execution for this job - FIFO, LIFO, LAST_ONLY, NONE
275     * @return List of CoordinatorActionBean
276     * @throws StoreException
277     */
278    public List<CoordinatorActionBean> getCoordinatorActionsForJob(final String id, final int numResults,
279                                                                   final String executionOrder) throws StoreException {
280        ParamChecker.notEmpty(id, "jobID");
281        List<CoordinatorActionBean> caBeans = doOperation("getCoordinatorActionsForJob",
282                                                          new Callable<List<CoordinatorActionBean>>() {
283                                                              public List<CoordinatorActionBean> call() throws StoreException {
284
285                                                                  List<CoordinatorActionBean> caBeans;
286                                                                  Query q;
287                                                                  // check if executionOrder is FIFO, LIFO, NONE or LAST_ONLY
288                                                                  if (executionOrder.equalsIgnoreCase("FIFO")) {
289                                                                      q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_FIFO");
290                                                                  }
291                                                                  else {
292                                                                      q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_LIFO");
293                                                                  }
294                                                                  q.setParameter("jobId", id);
295                                                                  // if executionOrder is LAST_ONLY, only retrieve first
296                                                                  // record in LIFO,
297                                                                  // otherwise, use numResults if it is positive.
298                                                                  if (executionOrder.equalsIgnoreCase("LAST_ONLY")) {
299                                                                      q.setMaxResults(1);
300                                                                  }
301                                                                  else {
302                                                                      if (numResults > 0) {
303                                                                          q.setMaxResults(numResults);
304                                                                      }
305                                                                  }
306                                                                  caBeans = q.getResultList();
307                                                                  return caBeans;
308                                                              }
309                                                          });
310        return caBeans;
311    }
312
313    /**
314     * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <=
315     * concurrency number.
316     *
317     * @param id job ID
318     * @return Number of running actions
319     * @throws StoreException
320     */
321    public int getCoordinatorRunningActionsCount(final String id) throws StoreException {
322        ParamChecker.notEmpty(id, "jobID");
323        Integer cnt = doOperation("getCoordinatorRunningActionsCount", new Callable<Integer>() {
324            public Integer call() throws SQLException {
325
326                Query q = entityManager.createNamedQuery("GET_COORD_RUNNING_ACTIONS_COUNT");
327
328                q.setParameter("jobId", id);
329                Long count = (Long) q.getSingleResult();
330                return Integer.valueOf(count.intValue());
331            }
332        });
333        return cnt.intValue();
334    }
335
336    /**
337     * Create a new Action record in the ACTIONS table with the given Bean.
338     *
339     * @param action WorkflowActionBean
340     * @throws StoreException If the action is already present
341     */
342    public void insertCoordinatorAction(final CoordinatorActionBean action) throws StoreException {
343        ParamChecker.notNull(action, "CoordinatorActionBean");
344        doOperation("insertCoordinatorAction", new Callable<Void>() {
345            public Void call() throws StoreException {
346                entityManager.persist(action);
347                return null;
348            }
349        });
350    }
351
352    /**
353     * Update the given action bean to DB.
354     *
355     * @param action Action Bean
356     * @throws StoreException if action doesn't exist
357     */
358    public void updateCoordinatorAction(final CoordinatorActionBean action) throws StoreException, JPAExecutorException {
359        ParamChecker.notNull(action, "CoordinatorActionBean");
360        doOperation("updateCoordinatorAction", new Callable<Void>() {
361            public Void call() throws StoreException, JPAExecutorException {
362                CoordActionQueryExecutor.getInstance().executeUpdate(
363                        CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION, action);
364                return null;
365            }
366        });
367    }
368
369    /**
370     * Update the given action bean to DB.
371     *
372     * @param action Action Bean
373     * @throws StoreException if action doesn't exist
374     */
375    public void updateCoordActionMin(final CoordinatorActionBean action) throws StoreException, JPAExecutorException {
376        ParamChecker.notNull(action, "CoordinatorActionBean");
377        doOperation("updateCoordinatorAction", new Callable<Void>() {
378            public Void call() throws StoreException, JPAExecutorException {
379                CoordActionQueryExecutor.getInstance().executeUpdate(
380                        CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK, action);
381                return null;
382            }
383        });
384    }
385
386    /**
387     * Update the given coordinator job bean to DB.
388     *
389     * @param jobbean Coordinator Job Bean
390     * @throws StoreException if action doesn't exist
391     */
392    public void updateCoordinatorJob(final CoordinatorJobBean job) throws StoreException {
393        ParamChecker.notNull(job, "CoordinatorJobBean");
394        doOperation("updateJob", new Callable<Void>() {
395            public Void call() throws StoreException, JPAExecutorException {
396                CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
397                return null;
398            }
399        });
400    }
401
402    public void updateCoordinatorJobStatus(final CoordinatorJobBean job) throws StoreException {
403        ParamChecker.notNull(job, "CoordinatorJobBean");
404        doOperation("updateJobStatus", new Callable<Void>() {
405            public Void call() throws StoreException, JPAExecutorException {
406                CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_MODTIME, job);
407                return null;
408            }
409        });
410    }
411
412    private <V> V doOperation(String name, Callable<V> command) throws StoreException {
413        try {
414            Instrumentation.Cron cron = new Instrumentation.Cron();
415            cron.start();
416            V retVal;
417            try {
418                retVal = command.call();
419            }
420            finally {
421                cron.stop();
422            }
423            Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron);
424            return retVal;
425        }
426        catch (StoreException ex) {
427            throw ex;
428        }
429        catch (SQLException ex) {
430            throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex);
431        }
432        catch (Exception e) {
433            throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e);
434        }
435    }
436
437    /**
438     * Purge the coordinators completed older than given days.
439     *
440     * @param olderThanDays number of days for which to preserve the coordinators
441     * @param limit maximum number of coordinator jobs to be purged
442     * @throws StoreException
443     */
444    public void purge(final long olderThanDays, final int limit) throws StoreException {
445        doOperation("coord-purge", new Callable<Void>() {
446            public Void call() throws SQLException, StoreException, WorkflowException {
447                Timestamp lastModTm = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS));
448                Query jobQ = entityManager.createNamedQuery("GET_COMPLETED_COORD_JOBS_OLDER_THAN_STATUS");
449                jobQ.setParameter("lastModTime", lastModTm);
450                jobQ.setMaxResults(limit);
451                List<CoordinatorJobBean> coordJobs = jobQ.getResultList();
452
453                int actionDeleted = 0;
454                if (coordJobs.size() != 0) {
455                    for (CoordinatorJobBean coord : coordJobs) {
456                        String jobId = coord.getId();
457                        entityManager.remove(coord);
458                        Query g = entityManager.createNamedQuery("DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR");
459                        g.setParameter("jobId", jobId);
460                        actionDeleted += g.executeUpdate();
461                    }
462                }
463
464                XLog.getLog(getClass()).debug("ENDED Coord Purge deleted jobs :" + coordJobs.size() + " and actions " + actionDeleted);
465                return null;
466            }
467        });
468    }
469
470    public void commit() throws StoreException {
471    }
472
473    public void close() throws StoreException {
474    }
475
476    public CoordinatorJobBean getCoordinatorJobs(String id) {
477        // TODO Auto-generated method stub
478        return null;
479    }
480
481    public CoordinatorJobInfo getCoordinatorInfo(final Map<String, List<String>> filter, final int start, final int len)
482            throws StoreException {
483
484        CoordinatorJobInfo coordJobInfo = doOperation("getCoordinatorJobInfo", new Callable<CoordinatorJobInfo>() {
485            public CoordinatorJobInfo call() throws SQLException, StoreException {
486                List<String> orArray = new ArrayList<String>();
487                List<String> colArray = new ArrayList<String>();
488                List<String> valArray = new ArrayList<String>();
489                StringBuilder sb = new StringBuilder("");
490
491                StoreStatusFilter.filter(filter, orArray, colArray, valArray, sb, StoreStatusFilter.coordSeletStr,
492                                         StoreStatusFilter.coordCountStr);
493
494                int realLen = 0;
495
496                Query q = null;
497                Query qTotal = null;
498                if (orArray.size() == 0) {
499                    q = entityManager.createNamedQuery("GET_COORD_JOBS_COLUMNS");
500                    q.setFirstResult(start - 1);
501                    q.setMaxResults(len);
502                    qTotal = entityManager.createNamedQuery("GET_COORD_JOBS_COUNT");
503                }
504                else {
505                    StringBuilder sbTotal = new StringBuilder(sb);
506                    sb.append(" order by w.createdTimestamp desc ");
507                    XLog.getLog(getClass()).debug("Created String is **** " + sb.toString());
508                    q = entityManager.createQuery(sb.toString());
509                    q.setFirstResult(start - 1);
510                    q.setMaxResults(len);
511                    qTotal = entityManager.createQuery(sbTotal.toString().replace(StoreStatusFilter.coordSeletStr,
512                                                                                  StoreStatusFilter.coordCountStr));
513                }
514
515                for (int i = 0; i < orArray.size(); i++) {
516                    q.setParameter(colArray.get(i), valArray.get(i));
517                    qTotal.setParameter(colArray.get(i), valArray.get(i));
518                }
519
520                OpenJPAQuery kq = OpenJPAPersistence.cast(q);
521                JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
522                fetch.setFetchBatchSize(20);
523                fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
524                fetch.setFetchDirection(FetchDirection.FORWARD);
525                fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
526                List<?> resultList = q.getResultList();
527                List<Object[]> objectArrList = (List<Object[]>) resultList;
528                List<CoordinatorJobBean> coordBeansList = new ArrayList<CoordinatorJobBean>();
529
530                for (Object[] arr : objectArrList) {
531                    CoordinatorJobBean ww = getBeanForCoordinatorJobFromArray(arr);
532                    coordBeansList.add(ww);
533                }
534
535                realLen = ((Long) qTotal.getSingleResult()).intValue();
536
537                return new CoordinatorJobInfo(coordBeansList, start, len, realLen);
538            }
539        });
540        return coordJobInfo;
541    }
542
543    private CoordinatorJobBean getBeanForCoordinatorJobFromArray(Object[] arr) {
544        CoordinatorJobBean bean = new CoordinatorJobBean();
545        bean.setId((String) arr[0]);
546        if (arr[1] != null) {
547            bean.setAppName((String) arr[1]);
548        }
549        if (arr[2] != null) {
550            bean.setStatus(Status.valueOf((String) arr[2]));
551        }
552        if (arr[3] != null) {
553            bean.setUser((String) arr[3]);
554        }
555        if (arr[4] != null) {
556            bean.setGroup((String) arr[4]);
557        }
558        if (arr[5] != null) {
559            bean.setStartTime((Timestamp) arr[5]);
560        }
561        if (arr[6] != null) {
562            bean.setEndTime((Timestamp) arr[6]);
563        }
564        if (arr[7] != null) {
565            bean.setAppPath((String) arr[7]);
566        }
567        if (arr[8] != null) {
568            bean.setConcurrency(((Integer) arr[8]).intValue());
569        }
570        if (arr[9] != null) {
571            bean.setFrequency((String) arr[9]);
572        }
573        if (arr[10] != null) {
574            bean.setLastActionTime((Timestamp) arr[10]);
575        }
576        if (arr[11] != null) {
577            bean.setNextMaterializedTime((Timestamp) arr[11]);
578        }
579        if (arr[13] != null) {
580            bean.setTimeUnit(Timeunit.valueOf((String) arr[13]));
581        }
582        if (arr[14] != null) {
583            bean.setTimeZone((String) arr[14]);
584        }
585        if (arr[15] != null) {
586            bean.setTimeout((Integer) arr[15]);
587        }
588        return bean;
589    }
590
591    /**
592     * Loads all actions for the given Coordinator job.
593     *
594     * @param jobId coordinator job id
595     * @param locking true if Actions are to be locked
596     * @return A List of CoordinatorActionBean
597     * @throws StoreException
598     */
599    public Integer getActionsForCoordinatorJob(final String jobId, final boolean locking)
600            throws StoreException {
601        ParamChecker.notEmpty(jobId, "CoordinatorJobID");
602        Integer actionsCount = doOperation("getActionsForCoordinatorJob",
603                                                          new Callable<Integer>() {
604                                                              @SuppressWarnings("unchecked")
605                                                              public Integer call() throws StoreException {
606                                                                  List<CoordinatorActionBean> actions;
607                                                                  List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
608                                                                  try {
609                                                                      Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB");
610                                                                      q.setParameter("jobId", jobId);
611                                                                      /*
612                                                                      * if (locking) { //
613                                                                      * q.setHint("openjpa.FetchPlan.ReadLockMode", //
614                                                                      * "READ"); OpenJPAQuery oq =
615                                                                      * OpenJPAPersistence.cast(q); JDBCFetchPlan fetch =
616                                                                      * (JDBCFetchPlan) oq.getFetchPlan();
617                                                                      * fetch.setReadLockMode(LockModeType.WRITE);
618                                                                      * fetch.setLockTimeout(-1); // 1 second }
619                                                                      */
620                                                                      Long count = (Long) q.getSingleResult();
621                                                                      return Integer.valueOf(count.intValue());
622                                                                      /*actions = q.getResultList();
623                                                                      for (CoordinatorActionBean a : actions) {
624                                                                          CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
625                                                                          actionList.add(aa);
626                                                                      }*/
627                                                                  }
628                                                                  catch (IllegalStateException e) {
629                                                                      throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
630                                                                  }
631                                                                  /*
632                                                                  * if (locking) { return actions; } else {
633                                                                  */
634
635                                                                  // }
636                                                              }
637                                                          });
638        return actionsCount;
639    }
640
641    /**
642     * Loads given number of actions for the given Coordinator job.
643     *
644     * @param jobId coordinator job id
645     * @param start offset for select statement
646     * @param len number of Workflow Actions to be returned
647     * @return A List of CoordinatorActionBean
648     * @throws StoreException
649     */
650    public List<CoordinatorActionBean> getActionsSubsetForCoordinatorJob(final String jobId, final int start,
651            final int len) throws StoreException {
652        ParamChecker.notEmpty(jobId, "CoordinatorJobID");
653        List<CoordinatorActionBean> actions = doOperation("getActionsForCoordinatorJob",
654                new Callable<List<CoordinatorActionBean>>() {
655                    @SuppressWarnings("unchecked")
656                    public List<CoordinatorActionBean> call() throws StoreException {
657                        List<CoordinatorActionBean> actions;
658                        List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
659                        try {
660                            Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB");
661                            q.setParameter("jobId", jobId);
662                            q.setFirstResult(start - 1);
663                            q.setMaxResults(len);
664                            actions = q.getResultList();
665                            for (CoordinatorActionBean a : actions) {
666                                CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
667                                actionList.add(aa);
668                            }
669                        }
670                        catch (IllegalStateException e) {
671                            throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
672                        }
673                        return actionList;
674                    }
675                });
676        return actions;
677    }
678
679    protected CoordinatorActionBean getBeanForRunningCoordAction(CoordinatorActionBean a) {
680        if (a != null) {
681            CoordinatorActionBean action = new CoordinatorActionBean();
682            action.setId(a.getId());
683            action.setActionNumber(a.getActionNumber());
684            action.setActionXmlBlob(a.getActionXmlBlob());
685            action.setConsoleUrl(a.getConsoleUrl());
686            action.setCreatedConfBlob(a.getCreatedConfBlob());
687            action.setErrorCode(a.getErrorCode());
688            action.setErrorMessage(a.getErrorMessage());
689            action.setExternalStatus(a.getExternalStatus());
690            action.setMissingDependenciesBlob(a.getMissingDependenciesBlob());
691            action.setRunConfBlob(a.getRunConfBlob());
692            action.setTimeOut(a.getTimeOut());
693            action.setTrackerUri(a.getTrackerUri());
694            action.setType(a.getType());
695            action.setCreatedTime(a.getCreatedTime());
696            action.setExternalId(a.getExternalId());
697            action.setJobId(a.getJobId());
698            action.setLastModifiedTime(a.getLastModifiedTime());
699            action.setNominalTime(a.getNominalTime());
700            action.setSlaXmlBlob(a.getSlaXmlBlob());
701            action.setStatus(a.getStatus());
702            return action;
703        }
704        return null;
705    }
706
707    public CoordinatorActionBean getAction(String id, boolean b) {
708        return null;
709    }
710
711
712    public List<CoordinatorActionBean> getRunningActionsForCoordinatorJob(final String jobId, final boolean locking)
713            throws StoreException {
714        ParamChecker.notEmpty(jobId, "CoordinatorJobID");
715        List<CoordinatorActionBean> actions = doOperation("getRunningActionsForCoordinatorJob",
716                                                          new Callable<List<CoordinatorActionBean>>() {
717                                                              @SuppressWarnings("unchecked")
718                                                              public List<CoordinatorActionBean> call() throws StoreException {
719                                                                  List<CoordinatorActionBean> actions;
720                                                                  try {
721                                                                      Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_FOR_COORD_JOB");
722                                                                      q.setParameter("jobId", jobId);
723                                                                      /*
724                                                                      * if (locking) {
725                                                                      * q.setHint("openjpa.FetchPlan.ReadLockMode",
726                                                                      * "READ"); OpenJPAQuery oq =
727                                                                      * OpenJPAPersistence.cast(q); FetchPlan fetch =
728                                                                      * oq.getFetchPlan();
729                                                                      * fetch.setReadLockMode(LockModeType.WRITE);
730                                                                      * fetch.setLockTimeout(-1); // no limit }
731                                                                      */
732                                                                      actions = q.getResultList();
733                                                                      return actions;
734                                                                  }
735                                                                  catch (IllegalStateException e) {
736                                                                      throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
737                                                                  }
738                                                              }
739                                                          });
740        return actions;
741    }
742
743    public List<CoordinatorActionBean> getRunningActionsOlderThan(final long checkAgeSecs, final boolean locking)
744            throws StoreException {
745        List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan",
746                                                          new Callable<List<CoordinatorActionBean>>() {
747                                                              @SuppressWarnings("unchecked")
748                                                              public List<CoordinatorActionBean> call() throws StoreException {
749                                                                  List<CoordinatorActionBean> actions;
750                                                                  Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
751                                                                  try {
752                                                                      Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_OLDER_THAN");
753                                                                      q.setParameter("lastModifiedTime", ts);
754                                                                      /*
755                                                                      * if (locking) { OpenJPAQuery oq =
756                                                                      * OpenJPAPersistence.cast(q); FetchPlan fetch =
757                                                                      * oq.getFetchPlan();
758                                                                      * fetch.setReadLockMode(LockModeType.WRITE);
759                                                                      * fetch.setLockTimeout(-1); // no limit }
760                                                                      */
761                                                                      actions = q.getResultList();
762                                                                      return actions;
763                                                                  }
764                                                                  catch (IllegalStateException e) {
765                                                                      throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
766                                                                  }
767                                                              }
768                                                          });
769        return actions;
770    }
771
772    public List<CoordinatorActionBean> getRecoveryActionsOlderThan(final long checkAgeSecs, final boolean locking)
773            throws StoreException {
774        List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan",
775                                                          new Callable<List<CoordinatorActionBean>>() {
776                                                              @SuppressWarnings("unchecked")
777                                                              public List<CoordinatorActionBean> call() throws StoreException {
778                                                                  List<CoordinatorActionBean> actions;
779                                                                  try {
780                                                                      Query q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN");
781                                                                      Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
782                                                                      q.setParameter("lastModifiedTime", ts);
783                                                                      /*
784                                                                      * if (locking) { OpenJPAQuery oq =
785                                                                      * OpenJPAPersistence.cast(q); FetchPlan fetch =
786                                                                      * oq.getFetchPlan();
787                                                                      * fetch.setReadLockMode(LockModeType.WRITE);
788                                                                      * fetch.setLockTimeout(-1); // no limit }
789                                                                      */
790                                                                      actions = q.getResultList();
791                                                                      return actions;
792                                                                  }
793                                                                  catch (IllegalStateException e) {
794                                                                      throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
795                                                                  }
796                                                              }
797                                                          });
798        return actions;
799    }
800
801    /**
802     * Get coordinator action beans for given start date and end date
803     *
804     * @param startDate
805     * @param endDate
806     * @return list of coordinator action beans
807     * @throws StoreException
808     */
809    public List<CoordinatorActionBean> getCoordActionsForDates(final String jobId, final Date startDate,
810            final Date endDate) throws StoreException {
811        List<CoordinatorActionBean> actions = doOperation("getCoordActionsForDates",
812                new Callable<List<CoordinatorActionBean>>() {
813                    @SuppressWarnings("unchecked")
814                    public List<CoordinatorActionBean> call() throws StoreException {
815                        List<CoordinatorActionBean> actions;
816                        try {
817                            Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_DATES");
818                            q.setParameter("jobId", jobId);
819                            q.setParameter("startTime", new Timestamp(startDate.getTime()));
820                            q.setParameter("endTime", new Timestamp(endDate.getTime()));
821                            actions = q.getResultList();
822                            List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
823                            for (CoordinatorActionBean a : actions) {
824                                CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
825                                actionList.add(aa);
826                            }
827                            return actionList;
828                        }
829                        catch (IllegalStateException e) {
830                            throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
831                        }
832                    }
833                });
834        return actions;
835    }
836
837    /**
838     * Get coordinator action bean for given date
839     *
840     * @param nominalTime
841     * @return CoordinatorActionBean
842     * @throws StoreException
843     */
844    public CoordinatorActionBean getCoordActionForNominalTime(final String jobId, final Date nominalTime)
845            throws StoreException {
846        CoordinatorActionBean action = doOperation("getCoordActionForNominalTime",
847                new Callable<CoordinatorActionBean>() {
848            @SuppressWarnings("unchecked")
849            public CoordinatorActionBean call() throws StoreException {
850                List<CoordinatorActionBean> actions;
851                Query q = entityManager.createNamedQuery("GET_ACTION_FOR_NOMINALTIME");
852                q.setParameter("jobId", jobId);
853                q.setParameter("nominalTime", new Timestamp(nominalTime.getTime()));
854                actions = q.getResultList();
855
856                CoordinatorActionBean action = null;
857                if (actions.size() > 0) {
858                    action = actions.get(0);
859                }
860                else {
861                    throw new StoreException(ErrorCode.E0605, DateUtils.formatDateOozieTZ(nominalTime));
862                }
863                return getBeanForRunningCoordAction(action);
864            }
865        });
866        return action;
867    }
868
869    public List<String> getRecoveryActionsGroupByJobId(final long checkAgeSecs) throws StoreException {
870        List<String> jobids = doOperation("getRecoveryActionsGroupByJobId", new Callable<List<String>>() {
871            @SuppressWarnings("unchecked")
872            public List<String> call() throws StoreException {
873                List<String> jobids = new ArrayList<String>();
874                try {
875                    Query q = entityManager.createNamedQuery("GET_READY_ACTIONS_GROUP_BY_JOBID");
876                    Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
877                    q.setParameter(1, ts);
878                    List<Object[]> list = q.getResultList();
879
880                    for (Object[] arr : list) {
881                        if (arr != null && arr[0] != null) {
882                            jobids.add((String) arr[0]);
883                        }
884                    }
885
886                    return jobids;
887                }
888                catch (IllegalStateException e) {
889                    throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
890                }
891            }
892        });
893        return jobids;
894    }
895}