This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.store;
019
020 import java.sql.SQLException;
021 import java.sql.Timestamp;
022 import java.util.ArrayList;
023 import java.util.Date;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.concurrent.Callable;
027
028 import javax.persistence.EntityManager;
029 import javax.persistence.Query;
030
031 import org.apache.oozie.CoordinatorActionBean;
032 import org.apache.oozie.CoordinatorJobBean;
033 import org.apache.oozie.CoordinatorJobInfo;
034 import org.apache.oozie.ErrorCode;
035 import org.apache.oozie.client.Job.Status;
036 import org.apache.oozie.client.CoordinatorJob.Timeunit;
037 import org.apache.oozie.service.InstrumentationService;
038 import org.apache.oozie.service.Services;
039 import org.apache.oozie.util.DateUtils;
040 import org.apache.oozie.util.Instrumentation;
041 import org.apache.oozie.util.ParamChecker;
042 import org.apache.oozie.util.XLog;
043 import org.apache.oozie.workflow.WorkflowException;
044 import org.apache.openjpa.persistence.OpenJPAPersistence;
045 import org.apache.openjpa.persistence.OpenJPAQuery;
046 import org.apache.openjpa.persistence.jdbc.FetchDirection;
047 import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
048 import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
049 import org.apache.openjpa.persistence.jdbc.ResultSetType;
050
051 /**
052 * DB Implementation of Coord Store
053 */
054 public class CoordinatorStore extends Store {
055 private final XLog log = XLog.getLog(getClass());
056
057 private EntityManager entityManager;
058 private static final String INSTR_GROUP = "db";
059 public static final int LOCK_TIMEOUT = 50000;
060 private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
061
062 public CoordinatorStore(boolean selectForUpdate) throws StoreException {
063 super();
064 entityManager = getEntityManager();
065 }
066
067 public CoordinatorStore(Store store, boolean selectForUpdate) throws StoreException {
068 super(store);
069 entityManager = getEntityManager();
070 }
071
072 /**
073 * Create a CoordJobBean. It also creates the process instance for the job.
074 *
075 * @param workflow workflow bean
076 * @throws StoreException
077 */
078
079 public void insertCoordinatorJob(final CoordinatorJobBean coordinatorJob) throws StoreException {
080 ParamChecker.notNull(coordinatorJob, "coordinatorJob");
081
082 doOperation("insertCoordinatorJob", new Callable<Void>() {
083 public Void call() throws StoreException {
084 entityManager.persist(coordinatorJob);
085 return null;
086 }
087 });
088 }
089
090 /**
091 * Load the CoordinatorJob into a Bean and return it. Also load the Workflow Instance into the bean. And lock the
092 * Workflow depending on the locking parameter.
093 *
094 * @param id Job ID
095 * @param locking Flag for Table Lock
096 * @return CoordinatorJobBean
097 * @throws StoreException
098 */
099 public CoordinatorJobBean getCoordinatorJob(final String id, final boolean locking) throws StoreException {
100 ParamChecker.notEmpty(id, "CoordJobId");
101 CoordinatorJobBean cjBean = doOperation("getCoordinatorJob", new Callable<CoordinatorJobBean>() {
102 @SuppressWarnings("unchecked")
103 public CoordinatorJobBean call() throws StoreException {
104 Query q = entityManager.createNamedQuery("GET_COORD_JOB");
105 q.setParameter("id", id);
106 /*
107 * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q);
108 * // q.setHint("openjpa.FetchPlan.ReadLockMode","WRITE");
109 * FetchPlan fetch = oq.getFetchPlan();
110 * fetch.setReadLockMode(LockModeType.WRITE);
111 * fetch.setLockTimeout(-1); // 1 second }
112 */
113 List<CoordinatorJobBean> cjBeans = q.getResultList();
114
115 if (cjBeans.size() > 0) {
116 return cjBeans.get(0);
117 }
118 else {
119 throw new StoreException(ErrorCode.E0604, id);
120 }
121 }
122 });
123
124 cjBean.setStatus(cjBean.getStatus());
125 return cjBean;
126 }
127
128 /**
129 * Get a list of Coordinator Jobs that should be materialized. Jobs with a 'last materialized time' older than the
130 * argument will be returned.
131 *
132 * @param d Date
133 * @return List of Coordinator Jobs that have a last materialized time older than input date
134 * @throws StoreException
135 */
136 public List<CoordinatorJobBean> getCoordinatorJobsToBeMaterialized(final Date d, final int limit)
137 throws StoreException {
138
139 ParamChecker.notNull(d, "Coord Job Materialization Date");
140 List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsToBeMaterialized",
141 new Callable<List<CoordinatorJobBean>>() {
142 public List<CoordinatorJobBean> call() throws StoreException {
143
144 List<CoordinatorJobBean> cjBeans;
145 List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>();
146 try {
147 Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN");
148 q.setParameter("matTime", new Timestamp(d.getTime()));
149 if (limit > 0) {
150 q.setMaxResults(limit);
151 }
152 /*
153 OpenJPAQuery oq = OpenJPAPersistence.cast(q);
154 FetchPlan fetch = oq.getFetchPlan();
155 fetch.setReadLockMode(LockModeType.WRITE);
156 fetch.setLockTimeout(-1); // no limit
157 */
158 cjBeans = q.getResultList();
159 // copy results to a new object
160 for (CoordinatorJobBean j : cjBeans) {
161 jobList.add(j);
162 }
163 }
164 catch (IllegalStateException e) {
165 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
166 }
167 return jobList;
168
169 }
170 });
171 return cjBeans;
172 }
173
174 /**
175 * A list of Coordinator Jobs that are matched with the status and have last materialized time' older than
176 * checkAgeSecs will be returned.
177 *
178 * @param checkAgeSecs Job age in Seconds
179 * @param status Coordinator Job Status
180 * @param limit Number of results to return
181 * @param locking Flag for Table Lock
182 * @return List of Coordinator Jobs that are matched with the parameters.
183 * @throws StoreException
184 */
185 public List<CoordinatorJobBean> getCoordinatorJobsOlderThanStatus(final long checkAgeSecs, final String status,
186 final int limit, final boolean locking) throws StoreException {
187
188 ParamChecker.notNull(status, "Coord Job Status");
189 List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsOlderThanStatus",
190 new Callable<List<CoordinatorJobBean>>() {
191 public List<CoordinatorJobBean> call() throws StoreException {
192
193 List<CoordinatorJobBean> cjBeans;
194 List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>();
195 try {
196 Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN_STATUS");
197 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
198 q.setParameter("lastModTime", ts);
199 q.setParameter("status", status);
200 if (limit > 0) {
201 q.setMaxResults(limit);
202 }
203 /*
204 * if (locking) { OpenJPAQuery oq =
205 * OpenJPAPersistence.cast(q); FetchPlan fetch =
206 * oq.getFetchPlan();
207 * fetch.setReadLockMode(LockModeType.WRITE);
208 * fetch.setLockTimeout(-1); // no limit }
209 */
210 cjBeans = q.getResultList();
211 for (CoordinatorJobBean j : cjBeans) {
212 jobList.add(j);
213 }
214 }
215 catch (Exception e) {
216 throw new StoreException(ErrorCode.E0603, e.getMessage(), e);
217 }
218 return jobList;
219
220 }
221 });
222 return cjBeans;
223 }
224
225 /**
226 * Load the CoordinatorAction into a Bean and return it.
227 *
228 * @param id action ID
229 * @return CoordinatorActionBean
230 * @throws StoreException
231 */
232 public CoordinatorActionBean getCoordinatorAction(final String id, final boolean locking) throws StoreException {
233 ParamChecker.notEmpty(id, "actionID");
234 CoordinatorActionBean caBean = doOperation("getCoordinatorAction", new Callable<CoordinatorActionBean>() {
235 public CoordinatorActionBean call() throws StoreException {
236 Query q = entityManager.createNamedQuery("GET_COORD_ACTION");
237 q.setParameter("id", id);
238 OpenJPAQuery oq = OpenJPAPersistence.cast(q);
239 /*
240 * if (locking) { //q.setHint("openjpa.FetchPlan.ReadLockMode",
241 * "WRITE"); FetchPlan fetch = oq.getFetchPlan();
242 * fetch.setReadLockMode(LockModeType.WRITE);
243 * fetch.setLockTimeout(-1); // no limit }
244 */
245
246 CoordinatorActionBean action = null;
247 List<CoordinatorActionBean> actions = q.getResultList();
248 if (actions.size() > 0) {
249 action = actions.get(0);
250 }
251 else {
252 throw new StoreException(ErrorCode.E0605, id);
253 }
254
255 /*
256 * if (locking) return action; else
257 */
258 return getBeanForRunningCoordAction(action);
259 }
260 });
261 return caBean;
262 }
263
264 /**
265 * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <=
266 * concurrency number. Sort returned actions based on execution order (FIFO, LIFO, LAST_ONLY)
267 *
268 * @param id job ID
269 * @param numResults number of results to return
270 * @param executionOrder execution for this job - FIFO, LIFO, LAST_ONLY
271 * @return List of CoordinatorActionBean
272 * @throws StoreException
273 */
274 public List<CoordinatorActionBean> getCoordinatorActionsForJob(final String id, final int numResults,
275 final String executionOrder) throws StoreException {
276 ParamChecker.notEmpty(id, "jobID");
277 List<CoordinatorActionBean> caBeans = doOperation("getCoordinatorActionsForJob",
278 new Callable<List<CoordinatorActionBean>>() {
279 public List<CoordinatorActionBean> call() throws StoreException {
280
281 List<CoordinatorActionBean> caBeans;
282 Query q;
283 // check if executionOrder is FIFO, LIFO, or LAST_ONLY
284 if (executionOrder.equalsIgnoreCase("FIFO")) {
285 q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_FIFO");
286 }
287 else {
288 q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_LIFO");
289 }
290 q.setParameter("jobId", id);
291 // if executionOrder is LAST_ONLY, only retrieve first
292 // record in LIFO,
293 // otherwise, use numResults if it is positive.
294 if (executionOrder.equalsIgnoreCase("LAST_ONLY")) {
295 q.setMaxResults(1);
296 }
297 else {
298 if (numResults > 0) {
299 q.setMaxResults(numResults);
300 }
301 }
302 caBeans = q.getResultList();
303 return caBeans;
304 }
305 });
306 return caBeans;
307 }
308
309 /**
310 * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <=
311 * concurrency number.
312 *
313 * @param id job ID
314 * @return Number of running actions
315 * @throws StoreException
316 */
317 public int getCoordinatorRunningActionsCount(final String id) throws StoreException {
318 ParamChecker.notEmpty(id, "jobID");
319 Integer cnt = doOperation("getCoordinatorRunningActionsCount", new Callable<Integer>() {
320 public Integer call() throws SQLException {
321
322 Query q = entityManager.createNamedQuery("GET_COORD_RUNNING_ACTIONS_COUNT");
323
324 q.setParameter("jobId", id);
325 Long count = (Long) q.getSingleResult();
326 return Integer.valueOf(count.intValue());
327 }
328 });
329 return cnt.intValue();
330 }
331
332 /**
333 * Create a new Action record in the ACTIONS table with the given Bean.
334 *
335 * @param action WorkflowActionBean
336 * @throws StoreException If the action is already present
337 */
338 public void insertCoordinatorAction(final CoordinatorActionBean action) throws StoreException {
339 ParamChecker.notNull(action, "CoordinatorActionBean");
340 doOperation("insertCoordinatorAction", new Callable<Void>() {
341 public Void call() throws StoreException {
342 entityManager.persist(action);
343 return null;
344 }
345 });
346 }
347
348 /**
349 * Update the given action bean to DB.
350 *
351 * @param action Action Bean
352 * @throws StoreException if action doesn't exist
353 */
354 public void updateCoordinatorAction(final CoordinatorActionBean action) throws StoreException {
355 ParamChecker.notNull(action, "CoordinatorActionBean");
356 doOperation("updateCoordinatorAction", new Callable<Void>() {
357 public Void call() throws StoreException {
358 Query q = entityManager.createNamedQuery("UPDATE_COORD_ACTION");
359 q.setParameter("id", action.getId());
360 setActionQueryParameters(action, q);
361 q.executeUpdate();
362 return null;
363 }
364 });
365 }
366
367 /**
368 * Update the given action bean to DB.
369 *
370 * @param action Action Bean
371 * @throws StoreException if action doesn't exist
372 */
373 public void updateCoordActionMin(final CoordinatorActionBean action) throws StoreException {
374 ParamChecker.notNull(action, "CoordinatorActionBean");
375 doOperation("updateCoordinatorAction", new Callable<Void>() {
376 public Void call() throws StoreException {
377 Query q = entityManager.createNamedQuery("UPDATE_COORD_ACTION_MIN");
378 q.setParameter("id", action.getId());
379 q.setParameter("missingDependencies", action.getMissingDependencies());
380 q.setParameter("lastModifiedTime", new Date());
381 q.setParameter("status", action.getStatus().toString());
382 q.setParameter("actionXml", action.getActionXml());
383 q.executeUpdate();
384 return null;
385 }
386 });
387 }
388
389 /**
390 * Update the given coordinator job bean to DB.
391 *
392 * @param jobbean Coordinator Job Bean
393 * @throws StoreException if action doesn't exist
394 */
395 public void updateCoordinatorJob(final CoordinatorJobBean job) throws StoreException {
396 ParamChecker.notNull(job, "CoordinatorJobBean");
397 doOperation("updateJob", new Callable<Void>() {
398 public Void call() throws StoreException {
399 Query q = entityManager.createNamedQuery("UPDATE_COORD_JOB");
400 q.setParameter("id", job.getId());
401 setJobQueryParameters(job, q);
402 q.executeUpdate();
403 return null;
404 }
405 });
406 }
407
408 public void updateCoordinatorJobStatus(final CoordinatorJobBean job) throws StoreException {
409 ParamChecker.notNull(job, "CoordinatorJobBean");
410 doOperation("updateJobStatus", new Callable<Void>() {
411 public Void call() throws StoreException {
412 Query q = entityManager.createNamedQuery("UPDATE_COORD_JOB_STATUS");
413 q.setParameter("id", job.getId());
414 q.setParameter("status", job.getStatus().toString());
415 q.setParameter("lastModifiedTime", new Date());
416 q.executeUpdate();
417 return null;
418 }
419 });
420 }
421
422 private <V> V doOperation(String name, Callable<V> command) throws StoreException {
423 try {
424 Instrumentation.Cron cron = new Instrumentation.Cron();
425 cron.start();
426 V retVal;
427 try {
428 retVal = command.call();
429 }
430 finally {
431 cron.stop();
432 }
433 Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron);
434 return retVal;
435 }
436 catch (StoreException ex) {
437 throw ex;
438 }
439 catch (SQLException ex) {
440 throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex);
441 }
442 catch (Exception e) {
443 throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e);
444 }
445 }
446
447 private void setJobQueryParameters(CoordinatorJobBean jBean, Query q) {
448 q.setParameter("appName", jBean.getAppName());
449 q.setParameter("appPath", jBean.getAppPath());
450 q.setParameter("concurrency", jBean.getConcurrency());
451 q.setParameter("conf", jBean.getConf());
452 q.setParameter("externalId", jBean.getExternalId());
453 q.setParameter("frequency", jBean.getFrequency());
454 q.setParameter("lastActionNumber", jBean.getLastActionNumber());
455 q.setParameter("timeOut", jBean.getTimeout());
456 q.setParameter("timeZone", jBean.getTimeZone());
457 q.setParameter("createdTime", jBean.getCreatedTimestamp());
458 q.setParameter("endTime", jBean.getEndTimestamp());
459 q.setParameter("execution", jBean.getExecution());
460 q.setParameter("jobXml", jBean.getJobXml());
461 q.setParameter("lastAction", jBean.getLastActionTimestamp());
462 q.setParameter("lastModifiedTime", new Date());
463 q.setParameter("nextMaterializedTime", jBean.getNextMaterializedTimestamp());
464 q.setParameter("origJobXml", jBean.getOrigJobXml());
465 q.setParameter("slaXml", jBean.getSlaXml());
466 q.setParameter("startTime", jBean.getStartTimestamp());
467 q.setParameter("status", jBean.getStatus().toString());
468 q.setParameter("timeUnit", jBean.getTimeUnitStr());
469 }
470
471 private void setActionQueryParameters(CoordinatorActionBean aBean, Query q) {
472 q.setParameter("actionNumber", aBean.getActionNumber());
473 q.setParameter("actionXml", aBean.getActionXml());
474 q.setParameter("consoleUrl", aBean.getConsoleUrl());
475 q.setParameter("createdConf", aBean.getCreatedConf());
476 q.setParameter("errorCode", aBean.getErrorCode());
477 q.setParameter("errorMessage", aBean.getErrorMessage());
478 q.setParameter("externalStatus", aBean.getExternalStatus());
479 q.setParameter("missingDependencies", aBean.getMissingDependencies());
480 q.setParameter("runConf", aBean.getRunConf());
481 q.setParameter("timeOut", aBean.getTimeOut());
482 q.setParameter("trackerUri", aBean.getTrackerUri());
483 q.setParameter("type", aBean.getType());
484 q.setParameter("createdTime", aBean.getCreatedTimestamp());
485 q.setParameter("externalId", aBean.getExternalId());
486 q.setParameter("jobId", aBean.getJobId());
487 q.setParameter("lastModifiedTime", new Date());
488 q.setParameter("nominalTime", aBean.getNominalTimestamp());
489 q.setParameter("slaXml", aBean.getSlaXml());
490 q.setParameter("status", aBean.getStatus().toString());
491 }
492
493
494 /**
495 * Purge the coordinators completed older than given days.
496 *
497 * @param olderThanDays number of days for which to preserve the coordinators
498 * @param limit maximum number of coordinator jobs to be purged
499 * @throws StoreException
500 */
501 public void purge(final long olderThanDays, final int limit) throws StoreException {
502 doOperation("coord-purge", new Callable<Void>() {
503 public Void call() throws SQLException, StoreException, WorkflowException {
504 Timestamp lastModTm = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS));
505 Query jobQ = entityManager.createNamedQuery("GET_COMPLETED_COORD_JOBS_OLDER_THAN_STATUS");
506 jobQ.setParameter("lastModTime", lastModTm);
507 jobQ.setMaxResults(limit);
508 List<CoordinatorJobBean> coordJobs = jobQ.getResultList();
509
510 int actionDeleted = 0;
511 if (coordJobs.size() != 0) {
512 for (CoordinatorJobBean coord : coordJobs) {
513 String jobId = coord.getId();
514 entityManager.remove(coord);
515 Query g = entityManager.createNamedQuery("DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR");
516 g.setParameter("jobId", jobId);
517 actionDeleted += g.executeUpdate();
518 }
519 }
520
521 XLog.getLog(getClass()).debug("ENDED Coord Purge deleted jobs :" + coordJobs.size() + " and actions " + actionDeleted);
522 return null;
523 }
524 });
525 }
526
527 public void commit() throws StoreException {
528 }
529
530 public void close() throws StoreException {
531 }
532
533 public CoordinatorJobBean getCoordinatorJobs(String id) {
534 // TODO Auto-generated method stub
535 return null;
536 }
537
538 public CoordinatorJobInfo getCoordinatorInfo(final Map<String, List<String>> filter, final int start, final int len)
539 throws StoreException {
540
541 CoordinatorJobInfo coordJobInfo = doOperation("getCoordinatorJobInfo", new Callable<CoordinatorJobInfo>() {
542 public CoordinatorJobInfo call() throws SQLException, StoreException {
543 List<String> orArray = new ArrayList<String>();
544 List<String> colArray = new ArrayList<String>();
545 List<String> valArray = new ArrayList<String>();
546 StringBuilder sb = new StringBuilder("");
547
548 StoreStatusFilter.filter(filter, orArray, colArray, valArray, sb, StoreStatusFilter.coordSeletStr,
549 StoreStatusFilter.coordCountStr);
550
551 int realLen = 0;
552
553 Query q = null;
554 Query qTotal = null;
555 if (orArray.size() == 0) {
556 q = entityManager.createNamedQuery("GET_COORD_JOBS_COLUMNS");
557 q.setFirstResult(start - 1);
558 q.setMaxResults(len);
559 qTotal = entityManager.createNamedQuery("GET_COORD_JOBS_COUNT");
560 }
561 else {
562 StringBuilder sbTotal = new StringBuilder(sb);
563 sb.append(" order by w.createdTimestamp desc ");
564 XLog.getLog(getClass()).debug("Created String is **** " + sb.toString());
565 q = entityManager.createQuery(sb.toString());
566 q.setFirstResult(start - 1);
567 q.setMaxResults(len);
568 qTotal = entityManager.createQuery(sbTotal.toString().replace(StoreStatusFilter.coordSeletStr,
569 StoreStatusFilter.coordCountStr));
570 }
571
572 for (int i = 0; i < orArray.size(); i++) {
573 q.setParameter(colArray.get(i), valArray.get(i));
574 qTotal.setParameter(colArray.get(i), valArray.get(i));
575 }
576
577 OpenJPAQuery kq = OpenJPAPersistence.cast(q);
578 JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
579 fetch.setFetchBatchSize(20);
580 fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
581 fetch.setFetchDirection(FetchDirection.FORWARD);
582 fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
583 List<?> resultList = q.getResultList();
584 List<Object[]> objectArrList = (List<Object[]>) resultList;
585 List<CoordinatorJobBean> coordBeansList = new ArrayList<CoordinatorJobBean>();
586
587 for (Object[] arr : objectArrList) {
588 CoordinatorJobBean ww = getBeanForCoordinatorJobFromArray(arr);
589 coordBeansList.add(ww);
590 }
591
592 realLen = ((Long) qTotal.getSingleResult()).intValue();
593
594 return new CoordinatorJobInfo(coordBeansList, start, len, realLen);
595 }
596 });
597 return coordJobInfo;
598 }
599
600 private CoordinatorJobBean getBeanForCoordinatorJobFromArray(Object[] arr) {
601 CoordinatorJobBean bean = new CoordinatorJobBean();
602 bean.setId((String) arr[0]);
603 if (arr[1] != null) {
604 bean.setAppName((String) arr[1]);
605 }
606 if (arr[2] != null) {
607 bean.setStatus(Status.valueOf((String) arr[2]));
608 }
609 if (arr[3] != null) {
610 bean.setUser((String) arr[3]);
611 }
612 if (arr[4] != null) {
613 bean.setGroup((String) arr[4]);
614 }
615 if (arr[5] != null) {
616 bean.setStartTime((Timestamp) arr[5]);
617 }
618 if (arr[6] != null) {
619 bean.setEndTime((Timestamp) arr[6]);
620 }
621 if (arr[7] != null) {
622 bean.setAppPath((String) arr[7]);
623 }
624 if (arr[8] != null) {
625 bean.setConcurrency(((Integer) arr[8]).intValue());
626 }
627 if (arr[9] != null) {
628 bean.setFrequency((String) arr[9]);
629 }
630 if (arr[10] != null) {
631 bean.setLastActionTime((Timestamp) arr[10]);
632 }
633 if (arr[11] != null) {
634 bean.setNextMaterializedTime((Timestamp) arr[11]);
635 }
636 if (arr[13] != null) {
637 bean.setTimeUnit(Timeunit.valueOf((String) arr[13]));
638 }
639 if (arr[14] != null) {
640 bean.setTimeZone((String) arr[14]);
641 }
642 if (arr[15] != null) {
643 bean.setTimeout((Integer) arr[15]);
644 }
645 return bean;
646 }
647
648 /**
649 * Loads all actions for the given Coordinator job.
650 *
651 * @param jobId coordinator job id
652 * @param locking true if Actions are to be locked
653 * @return A List of CoordinatorActionBean
654 * @throws StoreException
655 */
656 public Integer getActionsForCoordinatorJob(final String jobId, final boolean locking)
657 throws StoreException {
658 ParamChecker.notEmpty(jobId, "CoordinatorJobID");
659 Integer actionsCount = doOperation("getActionsForCoordinatorJob",
660 new Callable<Integer>() {
661 @SuppressWarnings("unchecked")
662 public Integer call() throws StoreException {
663 List<CoordinatorActionBean> actions;
664 List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
665 try {
666 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB");
667 q.setParameter("jobId", jobId);
668 /*
669 * if (locking) { //
670 * q.setHint("openjpa.FetchPlan.ReadLockMode", //
671 * "READ"); OpenJPAQuery oq =
672 * OpenJPAPersistence.cast(q); JDBCFetchPlan fetch =
673 * (JDBCFetchPlan) oq.getFetchPlan();
674 * fetch.setReadLockMode(LockModeType.WRITE);
675 * fetch.setLockTimeout(-1); // 1 second }
676 */
677 Long count = (Long) q.getSingleResult();
678 return Integer.valueOf(count.intValue());
679 /*actions = q.getResultList();
680 for (CoordinatorActionBean a : actions) {
681 CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
682 actionList.add(aa);
683 }*/
684 }
685 catch (IllegalStateException e) {
686 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
687 }
688 /*
689 * if (locking) { return actions; } else {
690 */
691
692 // }
693 }
694 });
695 return actionsCount;
696 }
697
698 /**
699 * Loads given number of actions for the given Coordinator job.
700 *
701 * @param jobId coordinator job id
702 * @param start offset for select statement
703 * @param len number of Workflow Actions to be returned
704 * @return A List of CoordinatorActionBean
705 * @throws StoreException
706 */
707 public List<CoordinatorActionBean> getActionsSubsetForCoordinatorJob(final String jobId, final int start,
708 final int len) throws StoreException {
709 ParamChecker.notEmpty(jobId, "CoordinatorJobID");
710 List<CoordinatorActionBean> actions = doOperation("getActionsForCoordinatorJob",
711 new Callable<List<CoordinatorActionBean>>() {
712 @SuppressWarnings("unchecked")
713 public List<CoordinatorActionBean> call() throws StoreException {
714 List<CoordinatorActionBean> actions;
715 List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
716 try {
717 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB");
718 q.setParameter("jobId", jobId);
719 q.setFirstResult(start - 1);
720 q.setMaxResults(len);
721 actions = q.getResultList();
722 for (CoordinatorActionBean a : actions) {
723 CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
724 actionList.add(aa);
725 }
726 }
727 catch (IllegalStateException e) {
728 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
729 }
730 return actionList;
731 }
732 });
733 return actions;
734 }
735
736 protected CoordinatorActionBean getBeanForRunningCoordAction(CoordinatorActionBean a) {
737 if (a != null) {
738 CoordinatorActionBean action = new CoordinatorActionBean();
739 action.setId(a.getId());
740 action.setActionNumber(a.getActionNumber());
741 action.setActionXml(a.getActionXml());
742 action.setConsoleUrl(a.getConsoleUrl());
743 action.setCreatedConf(a.getCreatedConf());
744 //action.setErrorCode(a.getErrorCode());
745 //action.setErrorMessage(a.getErrorMessage());
746 action.setExternalStatus(a.getExternalStatus());
747 action.setMissingDependencies(a.getMissingDependencies());
748 action.setRunConf(a.getRunConf());
749 action.setTimeOut(a.getTimeOut());
750 action.setTrackerUri(a.getTrackerUri());
751 action.setType(a.getType());
752 action.setCreatedTime(a.getCreatedTime());
753 action.setExternalId(a.getExternalId());
754 action.setJobId(a.getJobId());
755 action.setLastModifiedTime(a.getLastModifiedTime());
756 action.setNominalTime(a.getNominalTime());
757 action.setSlaXml(a.getSlaXml());
758 action.setStatus(a.getStatus());
759 return action;
760 }
761 return null;
762 }
763
764 public CoordinatorActionBean getAction(String id, boolean b) {
765 return null;
766 }
767
768
769 public List<CoordinatorActionBean> getRunningActionsForCoordinatorJob(final String jobId, final boolean locking)
770 throws StoreException {
771 ParamChecker.notEmpty(jobId, "CoordinatorJobID");
772 List<CoordinatorActionBean> actions = doOperation("getRunningActionsForCoordinatorJob",
773 new Callable<List<CoordinatorActionBean>>() {
774 @SuppressWarnings("unchecked")
775 public List<CoordinatorActionBean> call() throws StoreException {
776 List<CoordinatorActionBean> actions;
777 try {
778 Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_FOR_COORD_JOB");
779 q.setParameter("jobId", jobId);
780 /*
781 * if (locking) {
782 * q.setHint("openjpa.FetchPlan.ReadLockMode",
783 * "READ"); OpenJPAQuery oq =
784 * OpenJPAPersistence.cast(q); FetchPlan fetch =
785 * oq.getFetchPlan();
786 * fetch.setReadLockMode(LockModeType.WRITE);
787 * fetch.setLockTimeout(-1); // no limit }
788 */
789 actions = q.getResultList();
790 return actions;
791 }
792 catch (IllegalStateException e) {
793 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
794 }
795 }
796 });
797 return actions;
798 }
799
800 public List<CoordinatorActionBean> getRunningActionsOlderThan(final long checkAgeSecs, final boolean locking)
801 throws StoreException {
802 List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan",
803 new Callable<List<CoordinatorActionBean>>() {
804 @SuppressWarnings("unchecked")
805 public List<CoordinatorActionBean> call() throws StoreException {
806 List<CoordinatorActionBean> actions;
807 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
808 try {
809 Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_OLDER_THAN");
810 q.setParameter("lastModifiedTime", ts);
811 /*
812 * if (locking) { OpenJPAQuery oq =
813 * OpenJPAPersistence.cast(q); FetchPlan fetch =
814 * oq.getFetchPlan();
815 * fetch.setReadLockMode(LockModeType.WRITE);
816 * fetch.setLockTimeout(-1); // no limit }
817 */
818 actions = q.getResultList();
819 return actions;
820 }
821 catch (IllegalStateException e) {
822 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
823 }
824 }
825 });
826 return actions;
827 }
828
829 public List<CoordinatorActionBean> getRecoveryActionsOlderThan(final long checkAgeSecs, final boolean locking)
830 throws StoreException {
831 List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan",
832 new Callable<List<CoordinatorActionBean>>() {
833 @SuppressWarnings("unchecked")
834 public List<CoordinatorActionBean> call() throws StoreException {
835 List<CoordinatorActionBean> actions;
836 try {
837 Query q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN");
838 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
839 q.setParameter("lastModifiedTime", ts);
840 /*
841 * if (locking) { OpenJPAQuery oq =
842 * OpenJPAPersistence.cast(q); FetchPlan fetch =
843 * oq.getFetchPlan();
844 * fetch.setReadLockMode(LockModeType.WRITE);
845 * fetch.setLockTimeout(-1); // no limit }
846 */
847 actions = q.getResultList();
848 return actions;
849 }
850 catch (IllegalStateException e) {
851 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
852 }
853 }
854 });
855 return actions;
856 }
857
858 /**
859 * Get coordinator action beans for given start date and end date
860 *
861 * @param startDate
862 * @param endDate
863 * @return list of coordinator action beans
864 * @throws StoreException
865 */
866 public List<CoordinatorActionBean> getCoordActionsForDates(final String jobId, final Date startDate,
867 final Date endDate)
868 throws StoreException {
869 List<CoordinatorActionBean> actions = doOperation("getCoordActionsForDates",
870 new Callable<List<CoordinatorActionBean>>() {
871 @SuppressWarnings("unchecked")
872 public List<CoordinatorActionBean> call() throws StoreException {
873 List<CoordinatorActionBean> actions;
874 try {
875 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_DATES");
876 q.setParameter("jobId", jobId);
877 q.setParameter("startTime", new Timestamp(startDate.getTime()));
878 q.setParameter("endTime", new Timestamp(endDate.getTime()));
879 actions = q.getResultList();
880
881 List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>();
882 for (CoordinatorActionBean a : actions) {
883 CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
884 actionList.add(aa);
885 }
886 return actionList;
887 }
888 catch (IllegalStateException e) {
889 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
890 }
891 }
892 });
893 return actions;
894 }
895
896 /**
897 * Get coordinator action bean for given date
898 *
899 * @param nominalTime
900 * @return CoordinatorActionBean
901 * @throws StoreException
902 */
903 public CoordinatorActionBean getCoordActionForNominalTime(final String jobId, final Date nominalTime)
904 throws StoreException {
905 CoordinatorActionBean action = doOperation("getCoordActionForNominalTime",
906 new Callable<CoordinatorActionBean>() {
907 @SuppressWarnings("unchecked")
908 public CoordinatorActionBean call() throws StoreException {
909 List<CoordinatorActionBean> actions;
910 Query q = entityManager.createNamedQuery("GET_ACTION_FOR_NOMINALTIME");
911 q.setParameter("jobId", jobId);
912 q.setParameter("nominalTime", new Timestamp(nominalTime.getTime()));
913 actions = q.getResultList();
914
915 CoordinatorActionBean action = null;
916 if (actions.size() > 0) {
917 action = actions.get(0);
918 }
919 else {
920 throw new StoreException(ErrorCode.E0605, DateUtils.formatDateOozieTZ(nominalTime));
921 }
922 return getBeanForRunningCoordAction(action);
923 }
924 });
925 return action;
926 }
927
928 public List<String> getRecoveryActionsGroupByJobId(final long checkAgeSecs) throws StoreException {
929 List<String> jobids = doOperation("getRecoveryActionsGroupByJobId", new Callable<List<String>>() {
930 @SuppressWarnings("unchecked")
931 public List<String> call() throws StoreException {
932 List<String> jobids = new ArrayList<String>();
933 try {
934 Query q = entityManager.createNamedQuery("GET_READY_ACTIONS_GROUP_BY_JOBID");
935 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
936 q.setParameter(1, ts);
937 List<Object[]> list = q.getResultList();
938
939 for (Object[] arr : list) {
940 if (arr != null && arr[0] != null) {
941 jobids.add((String) arr[0]);
942 }
943 }
944
945 return jobids;
946 }
947 catch (IllegalStateException e) {
948 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
949 }
950 }
951 });
952 return jobids;
953 }
954 }