This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.store;
019
020 import java.sql.SQLException;
021 import java.sql.Timestamp;
022 import java.util.ArrayList;
023 import java.util.Date;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.concurrent.Callable;
027
028 import javax.persistence.EntityManager;
029 import javax.persistence.Query;
030
031 import org.apache.oozie.CoordinatorActionBean;
032 import org.apache.oozie.CoordinatorJobBean;
033 import org.apache.oozie.CoordinatorJobInfo;
034 import org.apache.oozie.ErrorCode;
035 import org.apache.oozie.client.Job.Status;
036 import org.apache.oozie.client.CoordinatorJob.Timeunit;
037 import org.apache.oozie.service.InstrumentationService;
038 import org.apache.oozie.service.Services;
039 import org.apache.oozie.util.DateUtils;
040 import org.apache.oozie.util.Instrumentation;
041 import org.apache.oozie.util.ParamChecker;
042 import org.apache.oozie.util.XLog;
043 import org.apache.oozie.workflow.WorkflowException;
044 import org.apache.openjpa.persistence.OpenJPAPersistence;
045 import org.apache.openjpa.persistence.OpenJPAQuery;
046 import org.apache.openjpa.persistence.jdbc.FetchDirection;
047 import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
048 import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
049 import org.apache.openjpa.persistence.jdbc.ResultSetType;
050
051 /**
052 * DB Implementation of Coord Store
053 */
054 public class CoordinatorStore extends Store {
055 private final XLog log = XLog.getLog(getClass());
056
057 private EntityManager entityManager;
058 private static final String INSTR_GROUP = "db";
059 public static final int LOCK_TIMEOUT = 50000;
060 private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
061
062 public CoordinatorStore(boolean selectForUpdate) throws StoreException {
063 super();
064 entityManager = getEntityManager();
065 }
066
067 public CoordinatorStore(Store store, boolean selectForUpdate) throws StoreException {
068 super(store);
069 entityManager = getEntityManager();
070 }
071
072 /**
073 * Create a CoordJobBean. It also creates the process instance for the job.
074 *
075 * @param workflow workflow bean
076 * @throws StoreException
077 */
078
079 public void insertCoordinatorJob(final CoordinatorJobBean coordinatorJob) throws StoreException {
080 ParamChecker.notNull(coordinatorJob, "coordinatorJob");
081
082 doOperation("insertCoordinatorJob", new Callable<Void>() {
083 public Void call() throws StoreException {
084 entityManager.persist(coordinatorJob);
085 return null;
086 }
087 });
088 }
089
090 /**
091 * Load the CoordinatorJob into a Bean and return it. Also load the Workflow Instance into the bean. And lock the
092 * Workflow depending on the locking parameter.
093 *
094 * @param id Job ID
095 * @param locking Flag for Table Lock
096 * @return CoordinatorJobBean
097 * @throws StoreException
098 */
099 public CoordinatorJobBean getCoordinatorJob(final String id, final boolean locking) throws StoreException {
100 ParamChecker.notEmpty(id, "CoordJobId");
101 CoordinatorJobBean cjBean = doOperation("getCoordinatorJob", new Callable<CoordinatorJobBean>() {
102 @SuppressWarnings("unchecked")
103 public CoordinatorJobBean call() throws StoreException {
104 Query q = entityManager.createNamedQuery("GET_COORD_JOB");
105 q.setParameter("id", id);
106 /*
107 * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q);
108 * // q.setHint("openjpa.FetchPlan.ReadLockMode","WRITE");
109 * FetchPlan fetch = oq.getFetchPlan();
110 * fetch.setReadLockMode(LockModeType.WRITE);
111 * fetch.setLockTimeout(-1); // 1 second }
112 */
113 List<CoordinatorJobBean> cjBeans = q.getResultList();
114
115 if (cjBeans.size() > 0) {
116 return cjBeans.get(0);
117 }
118 else {
119 throw new StoreException(ErrorCode.E0604, id);
120 }
121 }
122 });
123
124 cjBean.setStatus(cjBean.getStatus());
125 return cjBean;
126 }
127
128 /**
129 * Get a list of Coordinator Jobs that should be materialized. Jobs with a 'last materialized time' older than the
130 * argument will be returned.
131 *
132 * @param d Date
133 * @return List of Coordinator Jobs that have a last materialized time older than input date
134 * @throws StoreException
135 */
136 public List<CoordinatorJobBean> getCoordinatorJobsToBeMaterialized(final Date d, final int limit)
137 throws StoreException {
138
139 ParamChecker.notNull(d, "Coord Job Materialization Date");
140 List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsToBeMaterialized",
141 new Callable<List<CoordinatorJobBean>>() {
142 public List<CoordinatorJobBean> call() throws StoreException {
143
144 List<CoordinatorJobBean> cjBeans;
145 List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>();
146 try {
147 Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN");
148 q.setParameter("matTime", new Timestamp(d.getTime()));
149 if (limit > 0) {
150 q.setMaxResults(limit);
151 }
152 /*
153 OpenJPAQuery oq = OpenJPAPersistence.cast(q);
154 FetchPlan fetch = oq.getFetchPlan();
155 fetch.setReadLockMode(LockModeType.WRITE);
156 fetch.setLockTimeout(-1); // no limit
157 */
158 cjBeans = q.getResultList();
159 // copy results to a new object
160 for (CoordinatorJobBean j : cjBeans) {
161 jobList.add(j);
162 }
163 }
164 catch (IllegalStateException e) {
165 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
166 }
167 return jobList;
168
169 }
170 });
171 return cjBeans;
172 }
173
174 /**
175 * A list of Coordinator Jobs that are matched with the status and have last materialized time' older than
176 * checkAgeSecs will be returned.
177 *
178 * @param checkAgeSecs Job age in Seconds
179 * @param status Coordinator Job Status
180 * @param limit Number of results to return
181 * @param locking Flag for Table Lock
182 * @return List of Coordinator Jobs that are matched with the parameters.
183 * @throws StoreException
184 */
185 public List<CoordinatorJobBean> getCoordinatorJobsOlderThanStatus(final long checkAgeSecs, final String status,
186 final int limit, final boolean locking) throws StoreException {
187
188 ParamChecker.notNull(status, "Coord Job Status");
189 List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsOlderThanStatus",
190 new Callable<List<CoordinatorJobBean>>() {
191 public List<CoordinatorJobBean> call() throws StoreException {
192
193 List<CoordinatorJobBean> cjBeans;
194 List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>();
195 try {
196 Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN_STATUS");
197 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
198 q.setParameter("lastModTime", ts);
199 q.setParameter("status", status);
200 if (limit > 0) {
201 q.setMaxResults(limit);
202 }
203 /*
204 * if (locking) { OpenJPAQuery oq =
205 * OpenJPAPersistence.cast(q); FetchPlan fetch =
206 * oq.getFetchPlan();
207 * fetch.setReadLockMode(LockModeType.WRITE);
208 * fetch.setLockTimeout(-1); // no limit }
209 */
210 cjBeans = q.getResultList();
211 for (CoordinatorJobBean j : cjBeans) {
212 jobList.add(j);
213 }
214 }
215 catch (Exception e) {
216 throw new StoreException(ErrorCode.E0603, e.getMessage(), e);
217 }
218 return jobList;
219
220 }
221 });
222 return cjBeans;
223 }
224
225 /**
226 * Load the CoordinatorAction into a Bean and return it.
227 *
228 * @param id action ID
229 * @return CoordinatorActionBean
230 * @throws StoreException
231 */
232 public CoordinatorActionBean getCoordinatorAction(final String id, final boolean locking) throws StoreException {
233 ParamChecker.notEmpty(id, "actionID");
234 CoordinatorActionBean caBean = doOperation("getCoordinatorAction", new Callable<CoordinatorActionBean>() {
235 public CoordinatorActionBean call() throws StoreException {
236 Query q = entityManager.createNamedQuery("GET_COORD_ACTION");
237 q.setParameter("id", id);
238 OpenJPAQuery oq = OpenJPAPersistence.cast(q);
239 /*
240 * if (locking) { //q.setHint("openjpa.FetchPlan.ReadLockMode",
241 * "WRITE"); FetchPlan fetch = oq.getFetchPlan();
242 * fetch.setReadLockMode(LockModeType.WRITE);
243 * fetch.setLockTimeout(-1); // no limit }
244 */
245
246 CoordinatorActionBean action = null;
247 List<CoordinatorActionBean> actions = q.getResultList();
248 if (actions.size() > 0) {
249 action = actions.get(0);
250 }
251 else {
252 throw new StoreException(ErrorCode.E0605, id);
253 }
254
255 /*
256 * if (locking) return action; else
257 */
258 return getBeanForRunningCoordAction(action);
259 }
260 });
261 return caBean;
262 }
263
264 /**
265 * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <=
266 * concurrency number. Sort returned actions based on execution order (FIFO, LIFO, LAST_ONLY)
267 *
268 * @param id job ID
269 * @param numResults number of results to return
270 * @param executionOrder execution for this job - FIFO, LIFO, LAST_ONLY
271 * @return List of CoordinatorActionBean
272 * @throws StoreException
273 */
274 public List<CoordinatorActionBean> getCoordinatorActionsForJob(final String id, final int numResults,
275 final String executionOrder) throws StoreException {
276 ParamChecker.notEmpty(id, "jobID");
277 List<CoordinatorActionBean> caBeans = doOperation("getCoordinatorActionsForJob",
278 new Callable<List<CoordinatorActionBean>>() {
279 public List<CoordinatorActionBean> call() throws StoreException {
280
281 List<CoordinatorActionBean> caBeans;
282 Query q;
283 // check if executionOrder is FIFO, LIFO, or LAST_ONLY
284 if (executionOrder.equalsIgnoreCase("FIFO")) {
285 q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_FIFO");
286 }
287 else {
288 q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_LIFO");
289 }
290 q.setParameter("jobId", id);
291 // if executionOrder is LAST_ONLY, only retrieve first
292 // record in LIFO,
293 // otherwise, use numResults if it is positive.
294 if (executionOrder.equalsIgnoreCase("LAST_ONLY")) {
295 q.setMaxResults(1);
296 }
297 else {
298 if (numResults > 0) {
299 q.setMaxResults(numResults);
300 }
301 }
302 caBeans = q.getResultList();
303 return caBeans;
304 }
305 });
306 return caBeans;
307 }
308
309 /**
310 * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <=
311 * concurrency number.
312 *
313 * @param id job ID
314 * @return Number of running actions
315 * @throws StoreException
316 */
317 public int getCoordinatorRunningActionsCount(final String id) throws StoreException {
318 ParamChecker.notEmpty(id, "jobID");
319 Integer cnt = doOperation("getCoordinatorRunningActionsCount", new Callable<Integer>() {
320 public Integer call() throws SQLException {
321
322 Query q = entityManager.createNamedQuery("GET_COORD_RUNNING_ACTIONS_COUNT");
323
324 q.setParameter("jobId", id);
325 Long count = (Long) q.getSingleResult();
326 return Integer.valueOf(count.intValue());
327 }
328 });
329 return cnt.intValue();
330 }
331
332 /**
333 * Create a new Action record in the ACTIONS table with the given Bean.
334 *
335 * @param action WorkflowActionBean
336 * @throws StoreException If the action is already present
337 */
338 public void insertCoordinatorAction(final CoordinatorActionBean action) throws StoreException {
339 ParamChecker.notNull(action, "CoordinatorActionBean");
340 doOperation("insertCoordinatorAction", new Callable<Void>() {
341 public Void call() throws StoreException {
342 entityManager.persist(action);
343 return null;
344 }
345 });
346 }
347
348 /**
349 * Update the given action bean to DB.
350 *
351 * @param action Action Bean
352 * @throws StoreException if action doesn't exist
353 */
354 public void updateCoordinatorAction(final CoordinatorActionBean action) throws StoreException {
355 ParamChecker.notNull(action, "CoordinatorActionBean");
356 doOperation("updateCoordinatorAction", new Callable<Void>() {
357 public Void call() throws StoreException {
358 Query q = entityManager.createNamedQuery("UPDATE_COORD_ACTION");
359 q.setParameter("id", action.getId());
360 setActionQueryParameters(action, q);
361 q.executeUpdate();
362 return null;
363 }
364 });
365 }
366
367 /**
368 * Update the given action bean to DB.
369 *
370 * @param action Action Bean
371 * @throws StoreException if action doesn't exist
372 */
373 public void updateCoordActionMin(final CoordinatorActionBean action) throws StoreException {
374 ParamChecker.notNull(action, "CoordinatorActionBean");
375 doOperation("updateCoordinatorAction", new Callable<Void>() {
376 public Void call() throws StoreException {
377 Query q = entityManager.createNamedQuery("UPDATE_COORD_ACTION_MIN");
378 q.setParameter("id", action.getId());
379 q.setParameter("missingDependencies", action.getMissingDependencies());
380 q.setParameter("lastModifiedTime", new Date());
381 q.setParameter("status", action.getStatus().toString());
382 q.setParameter("actionXml", action.getActionXml());
383 q.executeUpdate();
384 return null;
385 }
386 });
387 }
388
389 /**
390 * Update the given coordinator job bean to DB.
391 *
392 * @param jobbean Coordinator Job Bean
393 * @throws StoreException if action doesn't exist
394 */
395 public void updateCoordinatorJob(final CoordinatorJobBean job) throws StoreException {
396 ParamChecker.notNull(job, "CoordinatorJobBean");
397 doOperation("updateJob", new Callable<Void>() {
398 public Void call() throws StoreException {
399 Query q = entityManager.createNamedQuery("UPDATE_COORD_JOB");
400 q.setParameter("id", job.getId());
401 setJobQueryParameters(job, q);
402 q.executeUpdate();
403 return null;
404 }
405 });
406 }
407
408 public void updateCoordinatorJobStatus(final CoordinatorJobBean job) throws StoreException {
409 ParamChecker.notNull(job, "CoordinatorJobBean");
410 doOperation("updateJobStatus", new Callable<Void>() {
411 public Void call() throws StoreException {
412 Query q = entityManager.createNamedQuery("UPDATE_COORD_JOB_STATUS");
413 q.setParameter("id", job.getId());
414 q.setParameter("status", job.getStatus().toString());
415 q.setParameter("lastModifiedTime", new Date());
416 q.executeUpdate();
417 return null;
418 }
419 });
420 }
421
422 private <V> V doOperation(String name, Callable<V> command) throws StoreException {
423 try {
424 Instrumentation.Cron cron = new Instrumentation.Cron();
425 cron.start();
426 V retVal;
427 try {
428 retVal = command.call();
429 }
430 finally {
431 cron.stop();
432 }
433 Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron);
434 return retVal;
435 }
436 catch (StoreException ex) {
437 throw ex;
438 }
439 catch (SQLException ex) {
440 throw new StoreException(ErrorCode.E0603, name, ex.getMessage(), ex);
441 }
442 catch (Exception e) {
443 throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e);
444 }
445 }
446
447 private void setJobQueryParameters(CoordinatorJobBean jBean, Query q) {
448 q.setParameter("appName", jBean.getAppName());
449 q.setParameter("appPath", jBean.getAppPath());
450 q.setParameter("concurrency", jBean.getConcurrency());
451 q.setParameter("conf", jBean.getConf());
452 q.setParameter("externalId", jBean.getExternalId());
453 q.setParameter("frequency", jBean.getFrequency());
454 q.setParameter("lastActionNumber", jBean.getLastActionNumber());
455 q.setParameter("timeOut", jBean.getTimeout());
456 q.setParameter("timeZone", jBean.getTimeZone());
457 q.setParameter("authToken", jBean.getAuthToken());
458 q.setParameter("createdTime", jBean.getCreatedTimestamp());
459 q.setParameter("endTime", jBean.getEndTimestamp());
460 q.setParameter("execution", jBean.getExecution());
461 q.setParameter("jobXml", jBean.getJobXml());
462 q.setParameter("lastAction", jBean.getLastActionTimestamp());
463 q.setParameter("lastModifiedTime", new Date());
464 q.setParameter("nextMaterializedTime", jBean.getNextMaterializedTimestamp());
465 q.setParameter("origJobXml", jBean.getOrigJobXml());
466 q.setParameter("slaXml", jBean.getSlaXml());
467 q.setParameter("startTime", jBean.getStartTimestamp());
468 q.setParameter("status", jBean.getStatus().toString());
469 q.setParameter("timeUnit", jBean.getTimeUnitStr());
470 }
471
472 private void setActionQueryParameters(CoordinatorActionBean aBean, Query q) {
473 q.setParameter("actionNumber", aBean.getActionNumber());
474 q.setParameter("actionXml", aBean.getActionXml());
475 q.setParameter("consoleUrl", aBean.getConsoleUrl());
476 q.setParameter("createdConf", aBean.getCreatedConf());
477 q.setParameter("errorCode", aBean.getErrorCode());
478 q.setParameter("errorMessage", aBean.getErrorMessage());
479 q.setParameter("externalStatus", aBean.getExternalStatus());
480 q.setParameter("missingDependencies", aBean.getMissingDependencies());
481 q.setParameter("runConf", aBean.getRunConf());
482 q.setParameter("timeOut", aBean.getTimeOut());
483 q.setParameter("trackerUri", aBean.getTrackerUri());
484 q.setParameter("type", aBean.getType());
485 q.setParameter("createdTime", aBean.getCreatedTimestamp());
486 q.setParameter("externalId", aBean.getExternalId());
487 q.setParameter("jobId", aBean.getJobId());
488 q.setParameter("lastModifiedTime", new Date());
489 q.setParameter("nominalTime", aBean.getNominalTimestamp());
490 q.setParameter("slaXml", aBean.getSlaXml());
491 q.setParameter("status", aBean.getStatus().toString());
492 }
493
494
495 /**
496 * Purge the coordinators completed older than given days.
497 *
498 * @param olderThanDays number of days for which to preserve the coordinators
499 * @param limit maximum number of coordinator jobs to be purged
500 * @throws StoreException
501 */
502 public void purge(final long olderThanDays, final int limit) throws StoreException {
503 doOperation("coord-purge", new Callable<Void>() {
504 public Void call() throws SQLException, StoreException, WorkflowException {
505 Timestamp lastModTm = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS));
506 Query jobQ = entityManager.createNamedQuery("GET_COMPLETED_COORD_JOBS_OLDER_THAN_STATUS");
507 jobQ.setParameter("lastModTime", lastModTm);
508 jobQ.setMaxResults(limit);
509 List<CoordinatorJobBean> coordJobs = jobQ.getResultList();
510
511 int actionDeleted = 0;
512 if (coordJobs.size() != 0) {
513 for (CoordinatorJobBean coord : coordJobs) {
514 String jobId = coord.getId();
515 entityManager.remove(coord);
516 Query g = entityManager.createNamedQuery("DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR");
517 g.setParameter("jobId", jobId);
518 actionDeleted += g.executeUpdate();
519 }
520 }
521
522 XLog.getLog(getClass()).debug("ENDED Coord Purge deleted jobs :" + coordJobs.size() + " and actions " + actionDeleted);
523 return null;
524 }
525 });
526 }
527
528 public void commit() throws StoreException {
529 }
530
531 public void close() throws StoreException {
532 }
533
534 public CoordinatorJobBean getCoordinatorJobs(String id) {
535 // TODO Auto-generated method stub
536 return null;
537 }
538
539 public CoordinatorJobInfo getCoordinatorInfo(final Map<String, List<String>> filter, final int start, final int len)
540 throws StoreException {
541
542 CoordinatorJobInfo coordJobInfo = doOperation("getCoordinatorJobInfo", new Callable<CoordinatorJobInfo>() {
543 public CoordinatorJobInfo call() throws SQLException, StoreException {
544 List<String> orArray = new ArrayList<String>();
545 List<String> colArray = new ArrayList<String>();
546 List<String> valArray = new ArrayList<String>();
547 StringBuilder sb = new StringBuilder("");
548
549 StoreStatusFilter.filter(filter, orArray, colArray, valArray, sb, StoreStatusFilter.coordSeletStr,
550 StoreStatusFilter.coordCountStr);
551
552 int realLen = 0;
553
554 Query q = null;
555 Query qTotal = null;
556 if (orArray.size() == 0) {
557 q = entityManager.createNamedQuery("GET_COORD_JOBS_COLUMNS");
558 q.setFirstResult(start - 1);
559 q.setMaxResults(len);
560 qTotal = entityManager.createNamedQuery("GET_COORD_JOBS_COUNT");
561 }
562 else {
563 StringBuilder sbTotal = new StringBuilder(sb);
564 sb.append(" order by w.createdTimestamp desc ");
565 XLog.getLog(getClass()).debug("Created String is **** " + sb.toString());
566 q = entityManager.createQuery(sb.toString());
567 q.setFirstResult(start - 1);
568 q.setMaxResults(len);
569 qTotal = entityManager.createQuery(sbTotal.toString().replace(StoreStatusFilter.coordSeletStr,
570 StoreStatusFilter.coordCountStr));
571 }
572
573 for (int i = 0; i < orArray.size(); i++) {
574 q.setParameter(colArray.get(i), valArray.get(i));
575 qTotal.setParameter(colArray.get(i), valArray.get(i));
576 }
577
578 OpenJPAQuery kq = OpenJPAPersistence.cast(q);
579 JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
580 fetch.setFetchBatchSize(20);
581 fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
582 fetch.setFetchDirection(FetchDirection.FORWARD);
583 fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
584 List<?> resultList = q.getResultList();
585 List<Object[]> objectArrList = (List<Object[]>) resultList;
586 List<CoordinatorJobBean> coordBeansList = new ArrayList<CoordinatorJobBean>();
587
588 for (Object[] arr : objectArrList) {
589 CoordinatorJobBean ww = getBeanForCoordinatorJobFromArray(arr);
590 coordBeansList.add(ww);
591 }
592
593 realLen = ((Long) qTotal.getSingleResult()).intValue();
594
595 return new CoordinatorJobInfo(coordBeansList, start, len, realLen);
596 }
597 });
598 return coordJobInfo;
599 }
600
601 private CoordinatorJobBean getBeanForCoordinatorJobFromArray(Object[] arr) {
602 CoordinatorJobBean bean = new CoordinatorJobBean();
603 bean.setId((String) arr[0]);
604 if (arr[1] != null) {
605 bean.setAppName((String) arr[1]);
606 }
607 if (arr[2] != null) {
608 bean.setStatus(Status.valueOf((String) arr[2]));
609 }
610 if (arr[3] != null) {
611 bean.setUser((String) arr[3]);
612 }
613 if (arr[4] != null) {
614 bean.setGroup((String) arr[4]);
615 }
616 if (arr[5] != null) {
617 bean.setStartTime((Timestamp) arr[5]);
618 }
619 if (arr[6] != null) {
620 bean.setEndTime((Timestamp) arr[6]);
621 }
622 if (arr[7] != null) {
623 bean.setAppPath((String) arr[7]);
624 }
625 if (arr[8] != null) {
626 bean.setConcurrency(((Integer) arr[8]).intValue());
627 }
628 if (arr[9] != null) {
629 bean.setFrequency(((Integer) arr[9]).intValue());
630 }
631 if (arr[10] != null) {
632 bean.setLastActionTime((Timestamp) arr[10]);
633 }
634 if (arr[11] != null) {
635 bean.setNextMaterializedTime((Timestamp) arr[11]);
636 }
637 if (arr[13] != null) {
638 bean.setTimeUnit(Timeunit.valueOf((String) arr[13]));
639 }
640 if (arr[14] != null) {
641 bean.setTimeZone((String) arr[14]);
642 }
643 if (arr[15] != null) {
644 bean.setTimeout((Integer) arr[15]);
645 }
646 return bean;
647 }
648
649 /**
650 * Loads all actions for the given Coordinator job.
651 *
652 * @param jobId coordinator job id
653 * @param locking true if Actions are to be locked
654 * @return A List of CoordinatorActionBean
655 * @throws StoreException
656 */
657 public List<CoordinatorActionBean> getActionsForCoordinatorJob(final String jobId, final boolean locking)
658 throws StoreException {
659 ParamChecker.notEmpty(jobId, "CoordinatorJobID");
660 List<CoordinatorActionBean> actions = doOperation("getActionsForCoordinatorJob",
661 new Callable<List<CoordinatorActionBean>>() {
662 @SuppressWarnings("unchecked")
663 public List<CoordinatorActionBean> call() throws StoreException {
664 List<CoordinatorActionBean> actions;
665 List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
666 try {
667 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB");
668 q.setParameter("jobId", jobId);
669 /*
670 * if (locking) { //
671 * q.setHint("openjpa.FetchPlan.ReadLockMode", //
672 * "READ"); OpenJPAQuery oq =
673 * OpenJPAPersistence.cast(q); JDBCFetchPlan fetch =
674 * (JDBCFetchPlan) oq.getFetchPlan();
675 * fetch.setReadLockMode(LockModeType.WRITE);
676 * fetch.setLockTimeout(-1); // 1 second }
677 */
678 actions = q.getResultList();
679 for (CoordinatorActionBean a : actions) {
680 CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
681 actionList.add(aa);
682 }
683 }
684 catch (IllegalStateException e) {
685 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
686 }
687 /*
688 * if (locking) { return actions; } else {
689 */
690 return actionList;
691 // }
692 }
693 });
694 return actions;
695 }
696
697 /**
698 * Loads given number of actions for the given Coordinator job.
699 *
700 * @param jobId coordinator job id
701 * @param start offset for select statement
702 * @param len number of Workflow Actions to be returned
703 * @return A List of CoordinatorActionBean
704 * @throws StoreException
705 */
706 public List<CoordinatorActionBean> getActionsSubsetForCoordinatorJob(final String jobId, final int start,
707 final int len) throws StoreException {
708 ParamChecker.notEmpty(jobId, "CoordinatorJobID");
709 List<CoordinatorActionBean> actions = doOperation("getActionsForCoordinatorJob",
710 new Callable<List<CoordinatorActionBean>>() {
711 @SuppressWarnings("unchecked")
712 public List<CoordinatorActionBean> call() throws StoreException {
713 List<CoordinatorActionBean> actions;
714 List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
715 try {
716 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB");
717 q.setParameter("jobId", jobId);
718 q.setFirstResult(start - 1);
719 q.setMaxResults(len);
720 actions = q.getResultList();
721 for (CoordinatorActionBean a : actions) {
722 CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
723 actionList.add(aa);
724 }
725 }
726 catch (IllegalStateException e) {
727 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
728 }
729 return actionList;
730 }
731 });
732 return actions;
733 }
734
735 protected CoordinatorActionBean getBeanForRunningCoordAction(CoordinatorActionBean a) {
736 if (a != null) {
737 CoordinatorActionBean action = new CoordinatorActionBean();
738 action.setId(a.getId());
739 action.setActionNumber(a.getActionNumber());
740 action.setActionXml(a.getActionXml());
741 action.setConsoleUrl(a.getConsoleUrl());
742 action.setCreatedConf(a.getCreatedConf());
743 //action.setErrorCode(a.getErrorCode());
744 //action.setErrorMessage(a.getErrorMessage());
745 action.setExternalStatus(a.getExternalStatus());
746 action.setMissingDependencies(a.getMissingDependencies());
747 action.setRunConf(a.getRunConf());
748 action.setTimeOut(a.getTimeOut());
749 action.setTrackerUri(a.getTrackerUri());
750 action.setType(a.getType());
751 action.setCreatedTime(a.getCreatedTime());
752 action.setExternalId(a.getExternalId());
753 action.setJobId(a.getJobId());
754 action.setLastModifiedTime(a.getLastModifiedTime());
755 action.setNominalTime(a.getNominalTime());
756 action.setSlaXml(a.getSlaXml());
757 action.setStatus(a.getStatus());
758 return action;
759 }
760 return null;
761 }
762
763 public CoordinatorActionBean getAction(String id, boolean b) {
764 return null;
765 }
766
767 /*
768 * do not need this public void updateCoordinatorActionForExternalId(final
769 * CoordinatorActionBean action) throws StoreException { // TODO
770 * Auto-generated method stub ParamChecker.notNull(action,
771 * "updateCoordinatorActionForExternalId");
772 * doOperation("updateCoordinatorActionForExternalId", new Callable<Void>()
773 * { public Void call() throws SQLException, StoreException,
774 * WorkflowException { Query q =
775 * entityManager.createNamedQuery("UPDATE_COORD_ACTION_FOR_EXTERNALID");
776 * setActionQueryParameters(action,q); q.executeUpdate(); return null; } });
777 * }
778 */
779 public CoordinatorActionBean getCoordinatorActionForExternalId(final String externalId) throws StoreException {
780 // TODO Auto-generated method stub
781 ParamChecker.notEmpty(externalId, "coodinatorActionExternalId");
782 CoordinatorActionBean cBean = doOperation("getCoordinatorActionForExternalId",
783 new Callable<CoordinatorActionBean>() {
784 public CoordinatorActionBean call() throws StoreException {
785 CoordinatorActionBean caBean = null;
786 Query q = entityManager.createNamedQuery("GET_COORD_ACTION_FOR_EXTERNALID");
787 q.setParameter("externalId", externalId);
788 List<CoordinatorActionBean> actionList = q.getResultList();
789 if (actionList.size() > 0) {
790 caBean = actionList.get(0);
791 }
792 return caBean;
793 }
794 });
795 return cBean;
796 }
797
798 public List<CoordinatorActionBean> getRunningActionsForCoordinatorJob(final String jobId, final boolean locking)
799 throws StoreException {
800 ParamChecker.notEmpty(jobId, "CoordinatorJobID");
801 List<CoordinatorActionBean> actions = doOperation("getRunningActionsForCoordinatorJob",
802 new Callable<List<CoordinatorActionBean>>() {
803 @SuppressWarnings("unchecked")
804 public List<CoordinatorActionBean> call() throws StoreException {
805 List<CoordinatorActionBean> actions;
806 try {
807 Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_FOR_COORD_JOB");
808 q.setParameter("jobId", jobId);
809 /*
810 * if (locking) {
811 * q.setHint("openjpa.FetchPlan.ReadLockMode",
812 * "READ"); OpenJPAQuery oq =
813 * OpenJPAPersistence.cast(q); FetchPlan fetch =
814 * oq.getFetchPlan();
815 * fetch.setReadLockMode(LockModeType.WRITE);
816 * fetch.setLockTimeout(-1); // no limit }
817 */
818 actions = q.getResultList();
819 return actions;
820 }
821 catch (IllegalStateException e) {
822 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
823 }
824 }
825 });
826 return actions;
827 }
828
829 public List<CoordinatorActionBean> getRunningActionsOlderThan(final long checkAgeSecs, final boolean locking)
830 throws StoreException {
831 List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan",
832 new Callable<List<CoordinatorActionBean>>() {
833 @SuppressWarnings("unchecked")
834 public List<CoordinatorActionBean> call() throws StoreException {
835 List<CoordinatorActionBean> actions;
836 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
837 try {
838 Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_OLDER_THAN");
839 q.setParameter("lastModifiedTime", ts);
840 /*
841 * if (locking) { OpenJPAQuery oq =
842 * OpenJPAPersistence.cast(q); FetchPlan fetch =
843 * oq.getFetchPlan();
844 * fetch.setReadLockMode(LockModeType.WRITE);
845 * fetch.setLockTimeout(-1); // no limit }
846 */
847 actions = q.getResultList();
848 return actions;
849 }
850 catch (IllegalStateException e) {
851 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
852 }
853 }
854 });
855 return actions;
856 }
857
858 public List<CoordinatorActionBean> getRecoveryActionsOlderThan(final long checkAgeSecs, final boolean locking)
859 throws StoreException {
860 List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan",
861 new Callable<List<CoordinatorActionBean>>() {
862 @SuppressWarnings("unchecked")
863 public List<CoordinatorActionBean> call() throws StoreException {
864 List<CoordinatorActionBean> actions;
865 try {
866 Query q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN");
867 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
868 q.setParameter("lastModifiedTime", ts);
869 /*
870 * if (locking) { OpenJPAQuery oq =
871 * OpenJPAPersistence.cast(q); FetchPlan fetch =
872 * oq.getFetchPlan();
873 * fetch.setReadLockMode(LockModeType.WRITE);
874 * fetch.setLockTimeout(-1); // no limit }
875 */
876 actions = q.getResultList();
877 return actions;
878 }
879 catch (IllegalStateException e) {
880 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
881 }
882 }
883 });
884 return actions;
885 }
886
887 /**
888 * Get coordinator action beans for given start date and end date
889 *
890 * @param startDate
891 * @param endDate
892 * @return list of coordinator action beans
893 * @throws StoreException
894 */
895 public List<CoordinatorActionBean> getCoordActionsForDates(final String jobId, final Date startDate,
896 final Date endDate)
897 throws StoreException {
898 List<CoordinatorActionBean> actions = doOperation("getCoordActionsForDates",
899 new Callable<List<CoordinatorActionBean>>() {
900 @SuppressWarnings("unchecked")
901 public List<CoordinatorActionBean> call() throws StoreException {
902 List<CoordinatorActionBean> actions;
903 try {
904 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_DATES");
905 q.setParameter("jobId", jobId);
906 q.setParameter("startTime", new Timestamp(startDate.getTime()));
907 q.setParameter("endTime", new Timestamp(endDate.getTime()));
908 actions = q.getResultList();
909
910 List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
911 for (CoordinatorActionBean a : actions) {
912 CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
913 actionList.add(aa);
914 }
915 return actionList;
916 }
917 catch (IllegalStateException e) {
918 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
919 }
920 }
921 });
922 return actions;
923 }
924
925 /**
926 * Get coordinator action bean for given date
927 *
928 * @param nominalTime
929 * @return CoordinatorActionBean
930 * @throws StoreException
931 */
932 public CoordinatorActionBean getCoordActionForNominalTime(final String jobId, final Date nominalTime)
933 throws StoreException {
934 CoordinatorActionBean action = doOperation("getCoordActionForNominalTime",
935 new Callable<CoordinatorActionBean>() {
936 @SuppressWarnings("unchecked")
937 public CoordinatorActionBean call() throws StoreException {
938 List<CoordinatorActionBean> actions;
939 Query q = entityManager.createNamedQuery("GET_ACTION_FOR_NOMINALTIME");
940 q.setParameter("jobId", jobId);
941 q.setParameter("nominalTime", new Timestamp(nominalTime.getTime()));
942 actions = q.getResultList();
943
944 CoordinatorActionBean action = null;
945 if (actions.size() > 0) {
946 action = actions.get(0);
947 }
948 else {
949 throw new StoreException(ErrorCode.E0605, DateUtils.convertDateToString(nominalTime));
950 }
951 return getBeanForRunningCoordAction(action);
952 }
953 });
954 return action;
955 }
956
957 public List<String> getRecoveryActionsGroupByJobId(final long checkAgeSecs) throws StoreException {
958 List<String> jobids = doOperation("getRecoveryActionsGroupByJobId", new Callable<List<String>>() {
959 @SuppressWarnings("unchecked")
960 public List<String> call() throws StoreException {
961 List<String> jobids = new ArrayList<String>();
962 try {
963 Query q = entityManager.createNamedQuery("GET_READY_ACTIONS_GROUP_BY_JOBID");
964 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
965 q.setParameter(1, ts);
966 List<Object[]> list = q.getResultList();
967
968 for (Object[] arr : list) {
969 if (arr != null && arr[0] != null) {
970 jobids.add((String) arr[0]);
971 }
972 }
973
974 return jobids;
975 }
976 catch (IllegalStateException e) {
977 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
978 }
979 }
980 });
981 return jobids;
982 }
983 }