This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.store;
019
020 import java.sql.SQLException;
021 import java.sql.Timestamp;
022 import java.util.ArrayList;
023 import java.util.Date;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.concurrent.Callable;
027
028 import javax.persistence.EntityManager;
029 import javax.persistence.Query;
030
031 import org.apache.oozie.CoordinatorActionBean;
032 import org.apache.oozie.CoordinatorJobBean;
033 import org.apache.oozie.CoordinatorJobInfo;
034 import org.apache.oozie.ErrorCode;
035 import org.apache.oozie.client.Job.Status;
036 import org.apache.oozie.client.CoordinatorJob.Timeunit;
037 import org.apache.oozie.service.InstrumentationService;
038 import org.apache.oozie.service.Services;
039 import org.apache.oozie.util.DateUtils;
040 import org.apache.oozie.util.Instrumentation;
041 import org.apache.oozie.util.ParamChecker;
042 import org.apache.oozie.util.XLog;
043 import org.apache.oozie.workflow.WorkflowException;
044 import org.apache.openjpa.persistence.OpenJPAPersistence;
045 import org.apache.openjpa.persistence.OpenJPAQuery;
046 import org.apache.openjpa.persistence.jdbc.FetchDirection;
047 import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
048 import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
049 import org.apache.openjpa.persistence.jdbc.ResultSetType;
050
051 /**
052 * DB Implementation of Coord Store
053 */
054 public class CoordinatorStore extends Store {
055 private final XLog log = XLog.getLog(getClass());
056
057 private EntityManager entityManager;
058 private static final String INSTR_GROUP = "db";
059 public static final int LOCK_TIMEOUT = 50000;
060 private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
061
062 public CoordinatorStore(boolean selectForUpdate) throws StoreException {
063 super();
064 entityManager = getEntityManager();
065 }
066
067 public CoordinatorStore(Store store, boolean selectForUpdate) throws StoreException {
068 super(store);
069 entityManager = getEntityManager();
070 }
071
072 /**
073 * Create a CoordJobBean. It also creates the process instance for the job.
074 *
075 * @param workflow workflow bean
076 * @throws StoreException
077 */
078
079 public void insertCoordinatorJob(final CoordinatorJobBean coordinatorJob) throws StoreException {
080 ParamChecker.notNull(coordinatorJob, "coordinatorJob");
081
082 doOperation("insertCoordinatorJob", new Callable<Void>() {
083 public Void call() throws StoreException {
084 entityManager.persist(coordinatorJob);
085 return null;
086 }
087 });
088 }
089
090 /**
091 * Load the CoordinatorJob into a Bean and return it. Also load the Workflow Instance into the bean. And lock the
092 * Workflow depending on the locking parameter.
093 *
094 * @param id Job ID
095 * @param locking Flag for Table Lock
096 * @return CoordinatorJobBean
097 * @throws StoreException
098 */
099 public CoordinatorJobBean getCoordinatorJob(final String id, final boolean locking) throws StoreException {
100 ParamChecker.notEmpty(id, "CoordJobId");
101 CoordinatorJobBean cjBean = doOperation("getCoordinatorJob", new Callable<CoordinatorJobBean>() {
102 @SuppressWarnings("unchecked")
103 public CoordinatorJobBean call() throws StoreException {
104 Query q = entityManager.createNamedQuery("GET_COORD_JOB");
105 q.setParameter("id", id);
106 /*
107 * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q);
108 * // q.setHint("openjpa.FetchPlan.ReadLockMode","WRITE");
109 * FetchPlan fetch = oq.getFetchPlan();
110 * fetch.setReadLockMode(LockModeType.WRITE);
111 * fetch.setLockTimeout(-1); // 1 second }
112 */
113 List<CoordinatorJobBean> cjBeans = q.getResultList();
114
115 if (cjBeans.size() > 0) {
116 return cjBeans.get(0);
117 }
118 else {
119 throw new StoreException(ErrorCode.E0604, id);
120 }
121 }
122 });
123
124 cjBean.setStatus(cjBean.getStatus());
125 return cjBean;
126 }
127
128 /**
129 * Get a list of Coordinator Jobs that should be materialized. Jobs with a 'last materialized time' older than the
130 * argument will be returned.
131 *
132 * @param d Date
133 * @return List of Coordinator Jobs that have a last materialized time older than input date
134 * @throws StoreException
135 */
136 public List<CoordinatorJobBean> getCoordinatorJobsToBeMaterialized(final Date d, final int limit)
137 throws StoreException {
138
139 ParamChecker.notNull(d, "Coord Job Materialization Date");
140 List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsToBeMaterialized",
141 new Callable<List<CoordinatorJobBean>>() {
142 public List<CoordinatorJobBean> call() throws StoreException {
143
144 List<CoordinatorJobBean> cjBeans;
145 List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>();
146 try {
147 Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN");
148 q.setParameter("matTime", new Timestamp(d.getTime()));
149 if (limit > 0) {
150 q.setMaxResults(limit);
151 }
152 /*
153 OpenJPAQuery oq = OpenJPAPersistence.cast(q);
154 FetchPlan fetch = oq.getFetchPlan();
155 fetch.setReadLockMode(LockModeType.WRITE);
156 fetch.setLockTimeout(-1); // no limit
157 */
158 cjBeans = q.getResultList();
159 // copy results to a new object
160 for (CoordinatorJobBean j : cjBeans) {
161 jobList.add(j);
162 }
163 }
164 catch (IllegalStateException e) {
165 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
166 }
167 return jobList;
168
169 }
170 });
171 return cjBeans;
172 }
173
174 /**
175 * A list of Coordinator Jobs that are matched with the status and have last materialized time' older than
176 * checkAgeSecs will be returned.
177 *
178 * @param checkAgeSecs Job age in Seconds
179 * @param status Coordinator Job Status
180 * @param limit Number of results to return
181 * @param locking Flag for Table Lock
182 * @return List of Coordinator Jobs that are matched with the parameters.
183 * @throws StoreException
184 */
185 public List<CoordinatorJobBean> getCoordinatorJobsOlderThanStatus(final long checkAgeSecs, final String status,
186 final int limit, final boolean locking) throws StoreException {
187
188 ParamChecker.notNull(status, "Coord Job Status");
189 List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsOlderThanStatus",
190 new Callable<List<CoordinatorJobBean>>() {
191 public List<CoordinatorJobBean> call() throws StoreException {
192
193 List<CoordinatorJobBean> cjBeans;
194 List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>();
195 try {
196 Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN_STATUS");
197 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
198 q.setParameter("lastModTime", ts);
199 q.setParameter("status", status);
200 if (limit > 0) {
201 q.setMaxResults(limit);
202 }
203 /*
204 * if (locking) { OpenJPAQuery oq =
205 * OpenJPAPersistence.cast(q); FetchPlan fetch =
206 * oq.getFetchPlan();
207 * fetch.setReadLockMode(LockModeType.WRITE);
208 * fetch.setLockTimeout(-1); // no limit }
209 */
210 cjBeans = q.getResultList();
211 for (CoordinatorJobBean j : cjBeans) {
212 jobList.add(j);
213 }
214 }
215 catch (Exception e) {
216 throw new StoreException(ErrorCode.E0603, e.getMessage(), e);
217 }
218 return jobList;
219
220 }
221 });
222 return cjBeans;
223 }
224
225 /**
226 * Load the CoordinatorAction into a Bean and return it.
227 *
228 * @param id action ID
229 * @return CoordinatorActionBean
230 * @throws StoreException
231 */
232 public CoordinatorActionBean getCoordinatorAction(final String id, final boolean locking) throws StoreException {
233 ParamChecker.notEmpty(id, "actionID");
234 CoordinatorActionBean caBean = doOperation("getCoordinatorAction", new Callable<CoordinatorActionBean>() {
235 public CoordinatorActionBean call() throws StoreException {
236 Query q = entityManager.createNamedQuery("GET_COORD_ACTION");
237 q.setParameter("id", id);
238 OpenJPAQuery oq = OpenJPAPersistence.cast(q);
239 /*
240 * if (locking) { //q.setHint("openjpa.FetchPlan.ReadLockMode",
241 * "WRITE"); FetchPlan fetch = oq.getFetchPlan();
242 * fetch.setReadLockMode(LockModeType.WRITE);
243 * fetch.setLockTimeout(-1); // no limit }
244 */
245
246 CoordinatorActionBean action = null;
247 List<CoordinatorActionBean> actions = q.getResultList();
248 if (actions.size() > 0) {
249 action = actions.get(0);
250 }
251 else {
252 throw new StoreException(ErrorCode.E0605, id);
253 }
254
255 /*
256 * if (locking) return action; else
257 */
258 return getBeanForRunningCoordAction(action);
259 }
260 });
261 return caBean;
262 }
263
264 /**
265 * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <=
266 * concurrency number. Sort returned actions based on execution order (FIFO, LIFO, LAST_ONLY)
267 *
268 * @param id job ID
269 * @param numResults number of results to return
270 * @param executionOrder execution for this job - FIFO, LIFO, LAST_ONLY
271 * @return List of CoordinatorActionBean
272 * @throws StoreException
273 */
274 public List<CoordinatorActionBean> getCoordinatorActionsForJob(final String id, final int numResults,
275 final String executionOrder) throws StoreException {
276 ParamChecker.notEmpty(id, "jobID");
277 List<CoordinatorActionBean> caBeans = doOperation("getCoordinatorActionsForJob",
278 new Callable<List<CoordinatorActionBean>>() {
279 public List<CoordinatorActionBean> call() throws StoreException {
280
281 List<CoordinatorActionBean> caBeans;
282 Query q;
283 // check if executionOrder is FIFO, LIFO, or LAST_ONLY
284 if (executionOrder.equalsIgnoreCase("FIFO")) {
285 q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_FIFO");
286 }
287 else {
288 q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_LIFO");
289 }
290 q.setParameter("jobId", id);
291 // if executionOrder is LAST_ONLY, only retrieve first
292 // record in LIFO,
293 // otherwise, use numResults if it is positive.
294 if (executionOrder.equalsIgnoreCase("LAST_ONLY")) {
295 q.setMaxResults(1);
296 }
297 else {
298 if (numResults > 0) {
299 q.setMaxResults(numResults);
300 }
301 }
302 caBeans = q.getResultList();
303 return caBeans;
304 }
305 });
306 return caBeans;
307 }
308
309 /**
310 * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <=
311 * concurrency number.
312 *
313 * @param id job ID
314 * @return Number of running actions
315 * @throws StoreException
316 */
317 public int getCoordinatorRunningActionsCount(final String id) throws StoreException {
318 ParamChecker.notEmpty(id, "jobID");
319 Integer cnt = doOperation("getCoordinatorRunningActionsCount", new Callable<Integer>() {
320 public Integer call() throws SQLException {
321
322 Query q = entityManager.createNamedQuery("GET_COORD_RUNNING_ACTIONS_COUNT");
323
324 q.setParameter("jobId", id);
325 Long count = (Long) q.getSingleResult();
326 return Integer.valueOf(count.intValue());
327 }
328 });
329 return cnt.intValue();
330 }
331
332 /**
333 * Create a new Action record in the ACTIONS table with the given Bean.
334 *
335 * @param action WorkflowActionBean
336 * @throws StoreException If the action is already present
337 */
338 public void insertCoordinatorAction(final CoordinatorActionBean action) throws StoreException {
339 ParamChecker.notNull(action, "CoordinatorActionBean");
340 doOperation("insertCoordinatorAction", new Callable<Void>() {
341 public Void call() throws StoreException {
342 entityManager.persist(action);
343 return null;
344 }
345 });
346 }
347
348 /**
349 * Update the given action bean to DB.
350 *
351 * @param action Action Bean
352 * @throws StoreException if action doesn't exist
353 */
354 public void updateCoordinatorAction(final CoordinatorActionBean action) throws StoreException {
355 ParamChecker.notNull(action, "CoordinatorActionBean");
356 doOperation("updateCoordinatorAction", new Callable<Void>() {
357 public Void call() throws StoreException {
358 Query q = entityManager.createNamedQuery("UPDATE_COORD_ACTION");
359 q.setParameter("id", action.getId());
360 setActionQueryParameters(action, q);
361 q.executeUpdate();
362 return null;
363 }
364 });
365 }
366
367 /**
368 * Update the given action bean to DB.
369 *
370 * @param action Action Bean
371 * @throws StoreException if action doesn't exist
372 */
373 public void updateCoordActionMin(final CoordinatorActionBean action) throws StoreException {
374 ParamChecker.notNull(action, "CoordinatorActionBean");
375 doOperation("updateCoordinatorAction", new Callable<Void>() {
376 public Void call() throws StoreException {
377 Query q = entityManager.createNamedQuery("UPDATE_COORD_ACTION_MIN");
378 q.setParameter("id", action.getId());
379 q.setParameter("missingDependencies", action.getMissingDependencies());
380 q.setParameter("lastModifiedTime", new Date());
381 q.setParameter("status", action.getStatus().toString());
382 q.setParameter("actionXml", action.getActionXml());
383 q.executeUpdate();
384 return null;
385 }
386 });
387 }
388
389 /**
390 * Update the given coordinator job bean to DB.
391 *
392 * @param jobbean Coordinator Job Bean
393 * @throws StoreException if action doesn't exist
394 */
395 public void updateCoordinatorJob(final CoordinatorJobBean job) throws StoreException {
396 ParamChecker.notNull(job, "CoordinatorJobBean");
397 doOperation("updateJob", new Callable<Void>() {
398 public Void call() throws StoreException {
399 Query q = entityManager.createNamedQuery("UPDATE_COORD_JOB");
400 q.setParameter("id", job.getId());
401 setJobQueryParameters(job, q);
402 q.executeUpdate();
403 return null;
404 }
405 });
406 }
407
408 public void updateCoordinatorJobStatus(final CoordinatorJobBean job) throws StoreException {
409 ParamChecker.notNull(job, "CoordinatorJobBean");
410 doOperation("updateJobStatus", new Callable<Void>() {
411 public Void call() throws StoreException {
412 Query q = entityManager.createNamedQuery("UPDATE_COORD_JOB_STATUS");
413 q.setParameter("id", job.getId());
414 q.setParameter("status", job.getStatus().toString());
415 q.setParameter("lastModifiedTime", new Date());
416 q.executeUpdate();
417 return null;
418 }
419 });
420 }
421
422 private <V> V doOperation(String name, Callable<V> command) throws StoreException {
423 try {
424 Instrumentation.Cron cron = new Instrumentation.Cron();
425 cron.start();
426 V retVal;
427 try {
428 retVal = command.call();
429 }
430 finally {
431 cron.stop();
432 }
433 Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron);
434 return retVal;
435 }
436 catch (StoreException ex) {
437 throw ex;
438 }
439 catch (SQLException ex) {
440 throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex);
441 }
442 catch (Exception e) {
443 throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e);
444 }
445 }
446
447 private void setJobQueryParameters(CoordinatorJobBean jBean, Query q) {
448 q.setParameter("appName", jBean.getAppName());
449 q.setParameter("appPath", jBean.getAppPath());
450 q.setParameter("concurrency", jBean.getConcurrency());
451 q.setParameter("conf", jBean.getConf());
452 q.setParameter("externalId", jBean.getExternalId());
453 q.setParameter("frequency", jBean.getFrequency());
454 q.setParameter("lastActionNumber", jBean.getLastActionNumber());
455 q.setParameter("timeOut", jBean.getTimeout());
456 q.setParameter("timeZone", jBean.getTimeZone());
457 q.setParameter("authToken", jBean.getAuthToken());
458 q.setParameter("createdTime", jBean.getCreatedTimestamp());
459 q.setParameter("endTime", jBean.getEndTimestamp());
460 q.setParameter("execution", jBean.getExecution());
461 q.setParameter("jobXml", jBean.getJobXml());
462 q.setParameter("lastAction", jBean.getLastActionTimestamp());
463 q.setParameter("lastModifiedTime", new Date());
464 q.setParameter("nextMaterializedTime", jBean.getNextMaterializedTimestamp());
465 q.setParameter("origJobXml", jBean.getOrigJobXml());
466 q.setParameter("slaXml", jBean.getSlaXml());
467 q.setParameter("startTime", jBean.getStartTimestamp());
468 q.setParameter("status", jBean.getStatus().toString());
469 q.setParameter("timeUnit", jBean.getTimeUnitStr());
470 }
471
472 private void setActionQueryParameters(CoordinatorActionBean aBean, Query q) {
473 q.setParameter("actionNumber", aBean.getActionNumber());
474 q.setParameter("actionXml", aBean.getActionXml());
475 q.setParameter("consoleUrl", aBean.getConsoleUrl());
476 q.setParameter("createdConf", aBean.getCreatedConf());
477 q.setParameter("errorCode", aBean.getErrorCode());
478 q.setParameter("errorMessage", aBean.getErrorMessage());
479 q.setParameter("externalStatus", aBean.getExternalStatus());
480 q.setParameter("missingDependencies", aBean.getMissingDependencies());
481 q.setParameter("runConf", aBean.getRunConf());
482 q.setParameter("timeOut", aBean.getTimeOut());
483 q.setParameter("trackerUri", aBean.getTrackerUri());
484 q.setParameter("type", aBean.getType());
485 q.setParameter("createdTime", aBean.getCreatedTimestamp());
486 q.setParameter("externalId", aBean.getExternalId());
487 q.setParameter("jobId", aBean.getJobId());
488 q.setParameter("lastModifiedTime", new Date());
489 q.setParameter("nominalTime", aBean.getNominalTimestamp());
490 q.setParameter("slaXml", aBean.getSlaXml());
491 q.setParameter("status", aBean.getStatus().toString());
492 }
493
494
495 /**
496 * Purge the coordinators completed older than given days.
497 *
498 * @param olderThanDays number of days for which to preserve the coordinators
499 * @param limit maximum number of coordinator jobs to be purged
500 * @throws StoreException
501 */
502 public void purge(final long olderThanDays, final int limit) throws StoreException {
503 doOperation("coord-purge", new Callable<Void>() {
504 public Void call() throws SQLException, StoreException, WorkflowException {
505 Timestamp lastModTm = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS));
506 Query jobQ = entityManager.createNamedQuery("GET_COMPLETED_COORD_JOBS_OLDER_THAN_STATUS");
507 jobQ.setParameter("lastModTime", lastModTm);
508 jobQ.setMaxResults(limit);
509 List<CoordinatorJobBean> coordJobs = jobQ.getResultList();
510
511 int actionDeleted = 0;
512 if (coordJobs.size() != 0) {
513 for (CoordinatorJobBean coord : coordJobs) {
514 String jobId = coord.getId();
515 entityManager.remove(coord);
516 Query g = entityManager.createNamedQuery("DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR");
517 g.setParameter("jobId", jobId);
518 actionDeleted += g.executeUpdate();
519 }
520 }
521
522 XLog.getLog(getClass()).debug("ENDED Coord Purge deleted jobs :" + coordJobs.size() + " and actions " + actionDeleted);
523 return null;
524 }
525 });
526 }
527
528 public void commit() throws StoreException {
529 }
530
531 public void close() throws StoreException {
532 }
533
534 public CoordinatorJobBean getCoordinatorJobs(String id) {
535 // TODO Auto-generated method stub
536 return null;
537 }
538
539 public CoordinatorJobInfo getCoordinatorInfo(final Map<String, List<String>> filter, final int start, final int len)
540 throws StoreException {
541
542 CoordinatorJobInfo coordJobInfo = doOperation("getCoordinatorJobInfo", new Callable<CoordinatorJobInfo>() {
543 public CoordinatorJobInfo call() throws SQLException, StoreException {
544 List<String> orArray = new ArrayList<String>();
545 List<String> colArray = new ArrayList<String>();
546 List<String> valArray = new ArrayList<String>();
547 StringBuilder sb = new StringBuilder("");
548
549 StoreStatusFilter.filter(filter, orArray, colArray, valArray, sb, StoreStatusFilter.coordSeletStr,
550 StoreStatusFilter.coordCountStr);
551
552 int realLen = 0;
553
554 Query q = null;
555 Query qTotal = null;
556 if (orArray.size() == 0) {
557 q = entityManager.createNamedQuery("GET_COORD_JOBS_COLUMNS");
558 q.setFirstResult(start - 1);
559 q.setMaxResults(len);
560 qTotal = entityManager.createNamedQuery("GET_COORD_JOBS_COUNT");
561 }
562 else {
563 StringBuilder sbTotal = new StringBuilder(sb);
564 sb.append(" order by w.createdTimestamp desc ");
565 XLog.getLog(getClass()).debug("Created String is **** " + sb.toString());
566 q = entityManager.createQuery(sb.toString());
567 q.setFirstResult(start - 1);
568 q.setMaxResults(len);
569 qTotal = entityManager.createQuery(sbTotal.toString().replace(StoreStatusFilter.coordSeletStr,
570 StoreStatusFilter.coordCountStr));
571 }
572
573 for (int i = 0; i < orArray.size(); i++) {
574 q.setParameter(colArray.get(i), valArray.get(i));
575 qTotal.setParameter(colArray.get(i), valArray.get(i));
576 }
577
578 OpenJPAQuery kq = OpenJPAPersistence.cast(q);
579 JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
580 fetch.setFetchBatchSize(20);
581 fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
582 fetch.setFetchDirection(FetchDirection.FORWARD);
583 fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
584 List<?> resultList = q.getResultList();
585 List<Object[]> objectArrList = (List<Object[]>) resultList;
586 List<CoordinatorJobBean> coordBeansList = new ArrayList<CoordinatorJobBean>();
587
588 for (Object[] arr : objectArrList) {
589 CoordinatorJobBean ww = getBeanForCoordinatorJobFromArray(arr);
590 coordBeansList.add(ww);
591 }
592
593 realLen = ((Long) qTotal.getSingleResult()).intValue();
594
595 return new CoordinatorJobInfo(coordBeansList, start, len, realLen);
596 }
597 });
598 return coordJobInfo;
599 }
600
601 private CoordinatorJobBean getBeanForCoordinatorJobFromArray(Object[] arr) {
602 CoordinatorJobBean bean = new CoordinatorJobBean();
603 bean.setId((String) arr[0]);
604 if (arr[1] != null) {
605 bean.setAppName((String) arr[1]);
606 }
607 if (arr[2] != null) {
608 bean.setStatus(Status.valueOf((String) arr[2]));
609 }
610 if (arr[3] != null) {
611 bean.setUser((String) arr[3]);
612 }
613 if (arr[4] != null) {
614 bean.setGroup((String) arr[4]);
615 }
616 if (arr[5] != null) {
617 bean.setStartTime((Timestamp) arr[5]);
618 }
619 if (arr[6] != null) {
620 bean.setEndTime((Timestamp) arr[6]);
621 }
622 if (arr[7] != null) {
623 bean.setAppPath((String) arr[7]);
624 }
625 if (arr[8] != null) {
626 bean.setConcurrency(((Integer) arr[8]).intValue());
627 }
628 if (arr[9] != null) {
629 bean.setFrequency(((Integer) arr[9]).intValue());
630 }
631 if (arr[10] != null) {
632 bean.setLastActionTime((Timestamp) arr[10]);
633 }
634 if (arr[11] != null) {
635 bean.setNextMaterializedTime((Timestamp) arr[11]);
636 }
637 if (arr[13] != null) {
638 bean.setTimeUnit(Timeunit.valueOf((String) arr[13]));
639 }
640 if (arr[14] != null) {
641 bean.setTimeZone((String) arr[14]);
642 }
643 if (arr[15] != null) {
644 bean.setTimeout((Integer) arr[15]);
645 }
646 return bean;
647 }
648
649 /**
650 * Loads all actions for the given Coordinator job.
651 *
652 * @param jobId coordinator job id
653 * @param locking true if Actions are to be locked
654 * @return A List of CoordinatorActionBean
655 * @throws StoreException
656 */
657 public Integer getActionsForCoordinatorJob(final String jobId, final boolean locking)
658 throws StoreException {
659 ParamChecker.notEmpty(jobId, "CoordinatorJobID");
660 Integer actionsCount = doOperation("getActionsForCoordinatorJob",
661 new Callable<Integer>() {
662 @SuppressWarnings("unchecked")
663 public Integer call() throws StoreException {
664 List<CoordinatorActionBean> actions;
665 List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
666 try {
667 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB");
668 q.setParameter("jobId", jobId);
669 /*
670 * if (locking) { //
671 * q.setHint("openjpa.FetchPlan.ReadLockMode", //
672 * "READ"); OpenJPAQuery oq =
673 * OpenJPAPersistence.cast(q); JDBCFetchPlan fetch =
674 * (JDBCFetchPlan) oq.getFetchPlan();
675 * fetch.setReadLockMode(LockModeType.WRITE);
676 * fetch.setLockTimeout(-1); // 1 second }
677 */
678 Long count = (Long) q.getSingleResult();
679 return Integer.valueOf(count.intValue());
680 /*actions = q.getResultList();
681 for (CoordinatorActionBean a : actions) {
682 CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
683 actionList.add(aa);
684 }*/
685 }
686 catch (IllegalStateException e) {
687 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
688 }
689 /*
690 * if (locking) { return actions; } else {
691 */
692
693 // }
694 }
695 });
696 return actionsCount;
697 }
698
699 /**
700 * Loads given number of actions for the given Coordinator job.
701 *
702 * @param jobId coordinator job id
703 * @param start offset for select statement
704 * @param len number of Workflow Actions to be returned
705 * @return A List of CoordinatorActionBean
706 * @throws StoreException
707 */
708 public List<CoordinatorActionBean> getActionsSubsetForCoordinatorJob(final String jobId, final int start,
709 final int len) throws StoreException {
710 ParamChecker.notEmpty(jobId, "CoordinatorJobID");
711 List<CoordinatorActionBean> actions = doOperation("getActionsForCoordinatorJob",
712 new Callable<List<CoordinatorActionBean>>() {
713 @SuppressWarnings("unchecked")
714 public List<CoordinatorActionBean> call() throws StoreException {
715 List<CoordinatorActionBean> actions;
716 List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
717 try {
718 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB");
719 q.setParameter("jobId", jobId);
720 q.setFirstResult(start - 1);
721 q.setMaxResults(len);
722 actions = q.getResultList();
723 for (CoordinatorActionBean a : actions) {
724 CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
725 actionList.add(aa);
726 }
727 }
728 catch (IllegalStateException e) {
729 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
730 }
731 return actionList;
732 }
733 });
734 return actions;
735 }
736
737 protected CoordinatorActionBean getBeanForRunningCoordAction(CoordinatorActionBean a) {
738 if (a != null) {
739 CoordinatorActionBean action = new CoordinatorActionBean();
740 action.setId(a.getId());
741 action.setActionNumber(a.getActionNumber());
742 action.setActionXml(a.getActionXml());
743 action.setConsoleUrl(a.getConsoleUrl());
744 action.setCreatedConf(a.getCreatedConf());
745 //action.setErrorCode(a.getErrorCode());
746 //action.setErrorMessage(a.getErrorMessage());
747 action.setExternalStatus(a.getExternalStatus());
748 action.setMissingDependencies(a.getMissingDependencies());
749 action.setRunConf(a.getRunConf());
750 action.setTimeOut(a.getTimeOut());
751 action.setTrackerUri(a.getTrackerUri());
752 action.setType(a.getType());
753 action.setCreatedTime(a.getCreatedTime());
754 action.setExternalId(a.getExternalId());
755 action.setJobId(a.getJobId());
756 action.setLastModifiedTime(a.getLastModifiedTime());
757 action.setNominalTime(a.getNominalTime());
758 action.setSlaXml(a.getSlaXml());
759 action.setStatus(a.getStatus());
760 return action;
761 }
762 return null;
763 }
764
765 public CoordinatorActionBean getAction(String id, boolean b) {
766 return null;
767 }
768
769
770 public List<CoordinatorActionBean> getRunningActionsForCoordinatorJob(final String jobId, final boolean locking)
771 throws StoreException {
772 ParamChecker.notEmpty(jobId, "CoordinatorJobID");
773 List<CoordinatorActionBean> actions = doOperation("getRunningActionsForCoordinatorJob",
774 new Callable<List<CoordinatorActionBean>>() {
775 @SuppressWarnings("unchecked")
776 public List<CoordinatorActionBean> call() throws StoreException {
777 List<CoordinatorActionBean> actions;
778 try {
779 Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_FOR_COORD_JOB");
780 q.setParameter("jobId", jobId);
781 /*
782 * if (locking) {
783 * q.setHint("openjpa.FetchPlan.ReadLockMode",
784 * "READ"); OpenJPAQuery oq =
785 * OpenJPAPersistence.cast(q); FetchPlan fetch =
786 * oq.getFetchPlan();
787 * fetch.setReadLockMode(LockModeType.WRITE);
788 * fetch.setLockTimeout(-1); // no limit }
789 */
790 actions = q.getResultList();
791 return actions;
792 }
793 catch (IllegalStateException e) {
794 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
795 }
796 }
797 });
798 return actions;
799 }
800
801 public List<CoordinatorActionBean> getRunningActionsOlderThan(final long checkAgeSecs, final boolean locking)
802 throws StoreException {
803 List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan",
804 new Callable<List<CoordinatorActionBean>>() {
805 @SuppressWarnings("unchecked")
806 public List<CoordinatorActionBean> call() throws StoreException {
807 List<CoordinatorActionBean> actions;
808 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
809 try {
810 Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_OLDER_THAN");
811 q.setParameter("lastModifiedTime", ts);
812 /*
813 * if (locking) { OpenJPAQuery oq =
814 * OpenJPAPersistence.cast(q); FetchPlan fetch =
815 * oq.getFetchPlan();
816 * fetch.setReadLockMode(LockModeType.WRITE);
817 * fetch.setLockTimeout(-1); // no limit }
818 */
819 actions = q.getResultList();
820 return actions;
821 }
822 catch (IllegalStateException e) {
823 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
824 }
825 }
826 });
827 return actions;
828 }
829
830 public List<CoordinatorActionBean> getRecoveryActionsOlderThan(final long checkAgeSecs, final boolean locking)
831 throws StoreException {
832 List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan",
833 new Callable<List<CoordinatorActionBean>>() {
834 @SuppressWarnings("unchecked")
835 public List<CoordinatorActionBean> call() throws StoreException {
836 List<CoordinatorActionBean> actions;
837 try {
838 Query q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN");
839 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
840 q.setParameter("lastModifiedTime", ts);
841 /*
842 * if (locking) { OpenJPAQuery oq =
843 * OpenJPAPersistence.cast(q); FetchPlan fetch =
844 * oq.getFetchPlan();
845 * fetch.setReadLockMode(LockModeType.WRITE);
846 * fetch.setLockTimeout(-1); // no limit }
847 */
848 actions = q.getResultList();
849 return actions;
850 }
851 catch (IllegalStateException e) {
852 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
853 }
854 }
855 });
856 return actions;
857 }
858
859 /**
860 * Get coordinator action beans for given start date and end date
861 *
862 * @param startDate
863 * @param endDate
864 * @return list of coordinator action beans
865 * @throws StoreException
866 */
867 public List<CoordinatorActionBean> getCoordActionsForDates(final String jobId, final Date startDate,
868 final Date endDate)
869 throws StoreException {
870 List<CoordinatorActionBean> actions = doOperation("getCoordActionsForDates",
871 new Callable<List<CoordinatorActionBean>>() {
872 @SuppressWarnings("unchecked")
873 public List<CoordinatorActionBean> call() throws StoreException {
874 List<CoordinatorActionBean> actions;
875 try {
876 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_DATES");
877 q.setParameter("jobId", jobId);
878 q.setParameter("startTime", new Timestamp(startDate.getTime()));
879 q.setParameter("endTime", new Timestamp(endDate.getTime()));
880 actions = q.getResultList();
881
882 List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
883 for (CoordinatorActionBean a : actions) {
884 CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
885 actionList.add(aa);
886 }
887 return actionList;
888 }
889 catch (IllegalStateException e) {
890 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
891 }
892 }
893 });
894 return actions;
895 }
896
897 /**
898 * Get coordinator action bean for given date
899 *
900 * @param nominalTime
901 * @return CoordinatorActionBean
902 * @throws StoreException
903 */
904 public CoordinatorActionBean getCoordActionForNominalTime(final String jobId, final Date nominalTime)
905 throws StoreException {
906 CoordinatorActionBean action = doOperation("getCoordActionForNominalTime",
907 new Callable<CoordinatorActionBean>() {
908 @SuppressWarnings("unchecked")
909 public CoordinatorActionBean call() throws StoreException {
910 List<CoordinatorActionBean> actions;
911 Query q = entityManager.createNamedQuery("GET_ACTION_FOR_NOMINALTIME");
912 q.setParameter("jobId", jobId);
913 q.setParameter("nominalTime", new Timestamp(nominalTime.getTime()));
914 actions = q.getResultList();
915
916 CoordinatorActionBean action = null;
917 if (actions.size() > 0) {
918 action = actions.get(0);
919 }
920 else {
921 throw new StoreException(ErrorCode.E0605, DateUtils.formatDateOozieTZ(nominalTime));
922 }
923 return getBeanForRunningCoordAction(action);
924 }
925 });
926 return action;
927 }
928
929 public List<String> getRecoveryActionsGroupByJobId(final long checkAgeSecs) throws StoreException {
930 List<String> jobids = doOperation("getRecoveryActionsGroupByJobId", new Callable<List<String>>() {
931 @SuppressWarnings("unchecked")
932 public List<String> call() throws StoreException {
933 List<String> jobids = new ArrayList<String>();
934 try {
935 Query q = entityManager.createNamedQuery("GET_READY_ACTIONS_GROUP_BY_JOBID");
936 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
937 q.setParameter(1, ts);
938 List<Object[]> list = q.getResultList();
939
940 for (Object[] arr : list) {
941 if (arr != null && arr[0] != null) {
942 jobids.add((String) arr[0]);
943 }
944 }
945
946 return jobids;
947 }
948 catch (IllegalStateException e) {
949 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
950 }
951 }
952 });
953 return jobids;
954 }
955 }