This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.store;
019
020 import java.sql.Connection;
021 import java.sql.SQLException;
022 import java.sql.Timestamp;
023 import java.util.ArrayList;
024 import java.util.Date;
025 import java.util.List;
026 import java.util.Map;
027 import java.util.concurrent.Callable;
028
029 import javax.persistence.EntityManager;
030 import javax.persistence.Query;
031
032 import org.apache.oozie.ErrorCode;
033 import org.apache.oozie.WorkflowActionBean;
034 import org.apache.oozie.WorkflowJobBean;
035 import org.apache.oozie.WorkflowsInfo;
036 import org.apache.oozie.client.OozieClient;
037 import org.apache.oozie.client.WorkflowJob.Status;
038 import org.apache.oozie.service.InstrumentationService;
039 import org.apache.oozie.service.SchemaService;
040 import org.apache.oozie.service.Services;
041 import org.apache.oozie.service.SchemaService.SchemaName;
042 import org.apache.oozie.util.Instrumentation;
043 import org.apache.oozie.util.ParamChecker;
044 import org.apache.oozie.util.XLog;
045 import org.apache.oozie.workflow.WorkflowException;
046 import org.apache.openjpa.persistence.OpenJPAEntityManager;
047 import org.apache.openjpa.persistence.OpenJPAPersistence;
048 import org.apache.openjpa.persistence.OpenJPAQuery;
049 import org.apache.openjpa.persistence.jdbc.FetchDirection;
050 import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
051 import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
052 import org.apache.openjpa.persistence.jdbc.ResultSetType;
053
054 /**
055 * DB Implementation of Workflow Store
056 */
057 public class WorkflowStore extends Store {
058 private Connection conn;
059 private EntityManager entityManager;
060 private boolean selectForUpdate;
061 private static final String INSTR_GROUP = "db";
062 public static final int LOCK_TIMEOUT = 50000;
063 private static final String seletStr = "Select w.id, w.appName, w.status, w.run, w.user, w.group, w.createdTimestamp, "
064 + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp from WorkflowJobBean w";
065 private static final String countStr = "Select count(w) from WorkflowJobBean w";
066
067 public WorkflowStore() {
068 }
069
070 public WorkflowStore(Connection connection, boolean selectForUpdate) throws StoreException {
071 super();
072 conn = ParamChecker.notNull(connection, "conn");
073 entityManager = getEntityManager();
074 this.selectForUpdate = selectForUpdate;
075 }
076
077 public WorkflowStore(Connection connection, Store store, boolean selectForUpdate) throws StoreException {
078 super(store);
079 conn = ParamChecker.notNull(connection, "conn");
080 entityManager = getEntityManager();
081 this.selectForUpdate = selectForUpdate;
082 }
083
084 public WorkflowStore(boolean selectForUpdate) throws StoreException {
085 super();
086 entityManager = getEntityManager();
087 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.WORKFLOW);
088 OpenJPAEntityManager kem = OpenJPAPersistence.cast(entityManager);
089 conn = (Connection) kem.getConnection();
090 this.selectForUpdate = selectForUpdate;
091 }
092
093 public WorkflowStore(Store store, boolean selectForUpdate) throws StoreException {
094 super(store);
095 entityManager = getEntityManager();
096 this.selectForUpdate = selectForUpdate;
097 }
098
099 /**
100 * Create a Workflow and return a WorkflowJobBean. It also creates the process instance for the job.
101 *
102 * @param workflow workflow bean
103 * @throws StoreException
104 */
105
106 public void insertWorkflow(final WorkflowJobBean workflow) throws StoreException {
107 ParamChecker.notNull(workflow, "workflow");
108
109 doOperation("insertWorkflow", new Callable<Void>() {
110 public Void call() throws SQLException, StoreException, WorkflowException {
111 entityManager.persist(workflow);
112 return null;
113 }
114 });
115 }
116
117 /**
118 * Load the Workflow into a Bean and return it. Also load the Workflow Instance into the bean. And lock the Workflow
119 * depending on the locking parameter.
120 *
121 * @param id Workflow ID
122 * @param locking true if Workflow is to be locked
123 * @return WorkflowJobBean
124 * @throws StoreException
125 */
126 public WorkflowJobBean getWorkflow(final String id, final boolean locking) throws StoreException {
127 ParamChecker.notEmpty(id, "WorkflowID");
128 WorkflowJobBean wfBean = doOperation("getWorkflow", new Callable<WorkflowJobBean>() {
129 public WorkflowJobBean call() throws SQLException, StoreException, WorkflowException, InterruptedException {
130 WorkflowJobBean wfBean = null;
131 wfBean = getWorkflowOnly(id, locking);
132 if (wfBean == null) {
133 throw new StoreException(ErrorCode.E0604, id);
134 }
135 /*
136 * WorkflowInstance wfInstance; //krishna and next line
137 * wfInstance = workflowLib.get(id); wfInstance =
138 * wfBean.get(wfBean.getWfInstance());
139 * wfBean.setWorkflowInstance(wfInstance);
140 * wfBean.setWfInstance(wfInstance);
141 */
142 return wfBean;
143 }
144 });
145 return wfBean;
146 }
147
148 /**
149 * Get the number of Workflows with the given status.
150 *
151 * @param status Workflow Status.
152 * @return number of Workflows with given status.
153 * @throws StoreException
154 */
155 public int getWorkflowCountWithStatus(final String status) throws StoreException {
156 ParamChecker.notEmpty(status, "status");
157 Integer cnt = doOperation("getWorkflowCountWithStatus", new Callable<Integer>() {
158 public Integer call() throws SQLException {
159 Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS");
160 q.setParameter("status", status);
161 Long count = (Long) q.getSingleResult();
162 return Integer.valueOf(count.intValue());
163 }
164 });
165 return cnt.intValue();
166 }
167
168 /**
169 * Get the number of Workflows with the given status which was modified in given time limit.
170 *
171 * @param status Workflow Status.
172 * @param secs No. of seconds within which the workflow got modified.
173 * @return number of Workflows modified within given time with given status.
174 * @throws StoreException
175 */
176 public int getWorkflowCountWithStatusInLastNSeconds(final String status, final int secs) throws StoreException {
177 ParamChecker.notEmpty(status, "status");
178 ParamChecker.notEmpty(status, "secs");
179 Integer cnt = doOperation("getWorkflowCountWithStatusInLastNSecs", new Callable<Integer>() {
180 public Integer call() throws SQLException {
181 Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS_IN_LAST_N_SECS");
182 Timestamp ts = new Timestamp(System.currentTimeMillis() - (secs * 1000));
183 q.setParameter("status", status);
184 q.setParameter("lastModTime", ts);
185 Long count = (Long) q.getSingleResult();
186 return Integer.valueOf(count.intValue());
187 }
188 });
189 return cnt.intValue();
190 }
191
192 /**
193 * Update the data from Workflow Bean to DB along with the workflow instance data. Action table is not updated
194 *
195 * @param wfBean Workflow Bean
196 * @throws StoreException If Workflow doesn't exist
197 */
198 public void updateWorkflow(final WorkflowJobBean wfBean) throws StoreException {
199 ParamChecker.notNull(wfBean, "WorkflowJobBean");
200 doOperation("updateWorkflow", new Callable<Void>() {
201 public Void call() throws SQLException, StoreException, WorkflowException {
202 Query q = entityManager.createNamedQuery("UPDATE_WORKFLOW");
203 q.setParameter("id", wfBean.getId());
204 setWFQueryParameters(wfBean, q);
205 q.executeUpdate();
206 return null;
207 }
208 });
209 }
210
211 /**
212 * Create a new Action record in the ACTIONS table with the given Bean.
213 *
214 * @param action WorkflowActionBean
215 * @throws StoreException If the action is already present
216 */
217 public void insertAction(final WorkflowActionBean action) throws StoreException {
218 ParamChecker.notNull(action, "WorkflowActionBean");
219 doOperation("insertAction", new Callable<Void>() {
220 public Void call() throws SQLException, StoreException, WorkflowException {
221 entityManager.persist(action);
222 return null;
223 }
224 });
225 }
226
227 /**
228 * Load the action data and returns a bean.
229 *
230 * @param id Action Id
231 * @param locking true if the action is to be locked
232 * @return Action Bean
233 * @throws StoreException If action doesn't exist
234 */
235 public WorkflowActionBean getAction(final String id, final boolean locking) throws StoreException {
236 ParamChecker.notEmpty(id, "ActionID");
237 WorkflowActionBean action = doOperation("getAction", new Callable<WorkflowActionBean>() {
238 public WorkflowActionBean call() throws SQLException, StoreException, WorkflowException,
239 InterruptedException {
240 Query q = entityManager.createNamedQuery("GET_ACTION");
241 /*
242 * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q);
243 * FetchPlan fetch = oq.getFetchPlan();
244 * fetch.setReadLockMode(LockModeType.WRITE);
245 * fetch.setLockTimeout(1000); // 1 seconds }
246 */
247 WorkflowActionBean action = null;
248 q.setParameter("id", id);
249 List<WorkflowActionBean> actions = q.getResultList();
250 // action = (WorkflowActionBean) q.getSingleResult();
251 if (actions.size() > 0) {
252 action = actions.get(0);
253 }
254 else {
255 throw new StoreException(ErrorCode.E0605, id);
256 }
257
258 /*
259 * if (locking) return action; else
260 */
261 // return action;
262 return getBeanForRunningAction(action);
263 }
264 });
265 return action;
266 }
267
268 /**
269 * Update the given action bean to DB.
270 *
271 * @param action Action Bean
272 * @throws StoreException if action doesn't exist
273 */
274 public void updateAction(final WorkflowActionBean action) throws StoreException {
275 ParamChecker.notNull(action, "WorkflowActionBean");
276 doOperation("updateAction", new Callable<Void>() {
277 public Void call() throws SQLException, StoreException, WorkflowException {
278 Query q = entityManager.createNamedQuery("UPDATE_ACTION");
279 q.setParameter("id", action.getId());
280 setActionQueryParameters(action, q);
281 q.executeUpdate();
282 return null;
283 }
284 });
285 }
286
287 /**
288 * Delete the Action with given id.
289 *
290 * @param id Action ID
291 * @throws StoreException if Action doesn't exist
292 */
293 public void deleteAction(final String id) throws StoreException {
294 ParamChecker.notEmpty(id, "ActionID");
295 doOperation("deleteAction", new Callable<Void>() {
296 public Void call() throws SQLException, StoreException, WorkflowException {
297 /*
298 * Query q = entityManager.createNamedQuery("DELETE_ACTION");
299 * q.setParameter("id", id); q.executeUpdate();
300 */
301 WorkflowActionBean action = entityManager.find(WorkflowActionBean.class, id);
302 if (action != null) {
303 entityManager.remove(action);
304 }
305 return null;
306 }
307 });
308 }
309
310 /**
311 * Loads all the actions for the given Workflow. Also locks all the actions if locking is true.
312 *
313 * @param wfId Workflow ID
314 * @param locking true if Actions are to be locked
315 * @return A List of WorkflowActionBean
316 * @throws StoreException
317 */
318 public List<WorkflowActionBean> getActionsForWorkflow(final String wfId, final boolean locking)
319 throws StoreException {
320 ParamChecker.notEmpty(wfId, "WorkflowID");
321 List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow",
322 new Callable<List<WorkflowActionBean>>() {
323 public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException,
324 InterruptedException {
325 List<WorkflowActionBean> actions;
326 List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>();
327 try {
328 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW");
329
330 /*
331 * OpenJPAQuery oq = OpenJPAPersistence.cast(q);
332 * if (locking) { //
333 * q.setHint("openjpa.FetchPlan.ReadLockMode"
334 * ,"WRITE"); FetchPlan fetch = oq.getFetchPlan();
335 * fetch.setReadLockMode(LockModeType.WRITE);
336 * fetch.setLockTimeout(1000); // 1 seconds }
337 */
338 q.setParameter("wfId", wfId);
339 actions = q.getResultList();
340 for (WorkflowActionBean a : actions) {
341 WorkflowActionBean aa = getBeanForRunningAction(a);
342 actionList.add(aa);
343 }
344 }
345 catch (IllegalStateException e) {
346 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
347 }
348 /*
349 * if (locking) { return actions; } else {
350 */
351 return actionList;
352 // }
353 }
354 });
355 return actions;
356 }
357
358 /**
359 * Loads given number of actions for the given Workflow. Also locks all the actions if locking is true.
360 *
361 * @param wfId Workflow ID
362 * @param start offset for select statement
363 * @param len number of Workflow Actions to be returned
364 * @param locking true if Actions are to be locked
365 * @return A List of WorkflowActionBean
366 * @throws StoreException
367 */
368 public List<WorkflowActionBean> getActionsSubsetForWorkflow(final String wfId, final int start, final int len)
369 throws StoreException {
370 ParamChecker.notEmpty(wfId, "WorkflowID");
371 List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow",
372 new Callable<List<WorkflowActionBean>>() {
373 public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException,
374 InterruptedException {
375 List<WorkflowActionBean> actions;
376 List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>();
377 try {
378 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW");
379 OpenJPAQuery oq = OpenJPAPersistence.cast(q);
380 q.setParameter("wfId", wfId);
381 q.setFirstResult(start - 1);
382 q.setMaxResults(len);
383 actions = q.getResultList();
384 for (WorkflowActionBean a : actions) {
385 WorkflowActionBean aa = getBeanForRunningAction(a);
386 actionList.add(aa);
387 }
388 }
389 catch (IllegalStateException e) {
390 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
391 }
392 return actionList;
393 }
394 });
395 return actions;
396 }
397
398 /**
399 * Load All the actions that are pending for more than given time.
400 *
401 * @param minimumPendingAgeSecs Minimum Pending age in seconds
402 * @return List of action beans
403 * @throws StoreException
404 */
405 public List<WorkflowActionBean> getPendingActions(final long minimumPendingAgeSecs) throws StoreException {
406 List<WorkflowActionBean> actions = doOperation("getPendingActions", new Callable<List<WorkflowActionBean>>() {
407 public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
408 Timestamp ts = new Timestamp(System.currentTimeMillis() - minimumPendingAgeSecs * 1000);
409 List<WorkflowActionBean> actionList = null;
410 try {
411 Query q = entityManager.createNamedQuery("GET_PENDING_ACTIONS");
412 q.setParameter("pendingAge", ts);
413 actionList = q.getResultList();
414 }
415 catch (IllegalStateException e) {
416 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
417 }
418 return actionList;
419 }
420 });
421 return actions;
422 }
423
424 /**
425 * Load All the actions that are running and were last checked after now - miminumCheckAgeSecs
426 *
427 * @param checkAgeSecs check age in seconds.
428 * @return List of action beans.
429 * @throws StoreException
430 */
431 public List<WorkflowActionBean> getRunningActions(final long checkAgeSecs) throws StoreException {
432 List<WorkflowActionBean> actions = doOperation("getRunningActions", new Callable<List<WorkflowActionBean>>() {
433
434 public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
435 List<WorkflowActionBean> actions = new ArrayList<WorkflowActionBean>();
436 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
437 try {
438 Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS");
439 q.setParameter("lastCheckTime", ts);
440 actions = q.getResultList();
441 }
442 catch (IllegalStateException e) {
443 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
444 }
445
446 return actions;
447 }
448 });
449 return actions;
450 }
451
452 /**
453 * Load All the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL.
454 *
455 * @param wfId String
456 * @return List of action beans
457 * @throws StoreException
458 */
459 public List<WorkflowActionBean> getRetryAndManualActions(final String wfId) throws StoreException {
460 List<WorkflowActionBean> actions = doOperation("GET_RETRY_MANUAL_ACTIONS",
461 new Callable<List<WorkflowActionBean>>() {
462 public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
463 List<WorkflowActionBean> actionList = null;
464 try {
465 Query q = entityManager.createNamedQuery("GET_RETRY_MANUAL_ACTIONS");
466 q.setParameter("wfId", wfId);
467 actionList = q.getResultList();
468 }
469 catch (IllegalStateException e) {
470 throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
471 }
472
473 return actionList;
474 }
475 });
476 return actions;
477 }
478
479 /**
480 * Loads all the jobs that are satisfying the given filter condition. Filters can be applied on user, group,
481 * appName, status.
482 *
483 * @param filter Filter condition
484 * @param start offset for select statement
485 * @param len number of Workflows to be returned
486 * @return A list of workflows
487 * @throws StoreException
488 */
489 public WorkflowsInfo getWorkflowsInfo(final Map<String, List<String>> filter, final int start, final int len)
490 throws StoreException {
491
492 WorkflowsInfo workFlowsInfo = doOperation("getWorkflowsInfo", new Callable<WorkflowsInfo>() {
493 @SuppressWarnings("unchecked")
494 public WorkflowsInfo call() throws SQLException, StoreException {
495
496 List<String> orArray = new ArrayList<String>();
497 List<String> colArray = new ArrayList<String>();
498 List<String> valArray = new ArrayList<String>();
499 StringBuilder sb = new StringBuilder("");
500 boolean isStatus = false;
501 boolean isGroup = false;
502 boolean isAppName = false;
503 boolean isUser = false;
504 boolean isEnabled = false;
505 int index = 0;
506 for (Map.Entry<String, List<String>> entry : filter.entrySet()) {
507 String colName = null;
508 String colVar = null;
509 if (entry.getKey().equals(OozieClient.FILTER_GROUP)) {
510 List<String> values = filter.get(OozieClient.FILTER_GROUP);
511 colName = "group";
512 for (int i = 0; i < values.size(); i++) {
513 colVar = "group";
514 colVar = colVar + index;
515 if (!isEnabled && !isGroup) {
516 sb.append(seletStr).append(" where w.group IN (:group" + index);
517 isGroup = true;
518 isEnabled = true;
519 }
520 else {
521 if (isEnabled && !isGroup) {
522 sb.append(" and w.group IN (:group" + index);
523 isGroup = true;
524 }
525 else {
526 if (isGroup) {
527 sb.append(", :group" + index);
528 }
529 }
530 }
531 if (i == values.size() - 1) {
532 sb.append(")");
533 }
534 index++;
535 valArray.add(values.get(i));
536 orArray.add(colName);
537 colArray.add(colVar);
538 }
539 }
540 else {
541 if (entry.getKey().equals(OozieClient.FILTER_STATUS)) {
542 List<String> values = filter.get(OozieClient.FILTER_STATUS);
543 colName = "status";
544 for (int i = 0; i < values.size(); i++) {
545 colVar = "status";
546 colVar = colVar + index;
547 if (!isEnabled && !isStatus) {
548 sb.append(seletStr).append(" where w.status IN (:status" + index);
549 isStatus = true;
550 isEnabled = true;
551 }
552 else {
553 if (isEnabled && !isStatus) {
554 sb.append(" and w.status IN (:status" + index);
555 isStatus = true;
556 }
557 else {
558 if (isStatus) {
559 sb.append(", :status" + index);
560 }
561 }
562 }
563 if (i == values.size() - 1) {
564 sb.append(")");
565 }
566 index++;
567 valArray.add(values.get(i));
568 orArray.add(colName);
569 colArray.add(colVar);
570 }
571 }
572 else {
573 if (entry.getKey().equals(OozieClient.FILTER_NAME)) {
574 List<String> values = filter.get(OozieClient.FILTER_NAME);
575 colName = "appName";
576 for (int i = 0; i < values.size(); i++) {
577 colVar = "appName";
578 colVar = colVar + index;
579 if (!isEnabled && !isAppName) {
580 sb.append(seletStr).append(" where w.appName IN (:appName" + index);
581 isAppName = true;
582 isEnabled = true;
583 }
584 else {
585 if (isEnabled && !isAppName) {
586 sb.append(" and w.appName IN (:appName" + index);
587 isAppName = true;
588 }
589 else {
590 if (isAppName) {
591 sb.append(", :appName" + index);
592 }
593 }
594 }
595 if (i == values.size() - 1) {
596 sb.append(")");
597 }
598 index++;
599 valArray.add(values.get(i));
600 orArray.add(colName);
601 colArray.add(colVar);
602 }
603 }
604 else {
605 if (entry.getKey().equals(OozieClient.FILTER_USER)) {
606 List<String> values = filter.get(OozieClient.FILTER_USER);
607 colName = "user";
608 for (int i = 0; i < values.size(); i++) {
609 colVar = "user";
610 colVar = colVar + index;
611 if (!isEnabled && !isUser) {
612 sb.append(seletStr).append(" where w.user IN (:user" + index);
613 isUser = true;
614 isEnabled = true;
615 }
616 else {
617 if (isEnabled && !isUser) {
618 sb.append(" and w.user IN (:user" + index);
619 isUser = true;
620 }
621 else {
622 if (isUser) {
623 sb.append(", :user" + index);
624 }
625 }
626 }
627 if (i == values.size() - 1) {
628 sb.append(")");
629 }
630 index++;
631 valArray.add(values.get(i));
632 orArray.add(colName);
633 colArray.add(colVar);
634 }
635 }
636 }
637 }
638 }
639 }
640
641 int realLen = 0;
642
643 Query q = null;
644 Query qTotal = null;
645 if (orArray.size() == 0) {
646 q = entityManager.createNamedQuery("GET_WORKFLOWS_COLUMNS");
647 q.setFirstResult(start - 1);
648 q.setMaxResults(len);
649 qTotal = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT");
650 }
651 else {
652 if (orArray.size() > 0) {
653 StringBuilder sbTotal = new StringBuilder(sb);
654 sb.append(" order by w.startTimestamp desc ");
655 XLog.getLog(getClass()).debug("Created String is **** " + sb.toString());
656 q = entityManager.createQuery(sb.toString());
657 q.setFirstResult(start - 1);
658 q.setMaxResults(len);
659 qTotal = entityManager.createQuery(sbTotal.toString().replace(seletStr, countStr));
660 for (int i = 0; i < orArray.size(); i++) {
661 q.setParameter(colArray.get(i), valArray.get(i));
662 qTotal.setParameter(colArray.get(i), valArray.get(i));
663 }
664 }
665 }
666
667 OpenJPAQuery kq = OpenJPAPersistence.cast(q);
668 JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
669 fetch.setFetchBatchSize(20);
670 fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
671 fetch.setFetchDirection(FetchDirection.FORWARD);
672 fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
673 List<?> resultList = q.getResultList();
674 List<Object[]> objectArrList = (List<Object[]>) resultList;
675 List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>();
676
677 for (Object[] arr : objectArrList) {
678 WorkflowJobBean ww = getBeanForWorkflowFromArray(arr);
679 wfBeansList.add(ww);
680 }
681
682 realLen = ((Long) qTotal.getSingleResult()).intValue();
683
684 return new WorkflowsInfo(wfBeansList, start, len, realLen);
685 }
686 });
687 return workFlowsInfo;
688
689 }
690
691 /**
692 * Load the Workflow and all Action details and return a WorkflowJobBean. Workflow Instance is not loaded
693 *
694 * @param id Workflow Id
695 * @return Workflow Bean
696 * @throws StoreException If Workflow doesn't exist
697 */
698 public WorkflowJobBean getWorkflowInfo(final String id) throws StoreException {
699 ParamChecker.notEmpty(id, "WorkflowID");
700 WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() {
701 public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException {
702 WorkflowJobBean wfBean = null;
703 wfBean = getWorkflowforInfo(id, false);
704 if (wfBean == null) {
705 throw new StoreException(ErrorCode.E0604, id);
706 }
707 else {
708 wfBean.setActions(getActionsForWorkflow(id, false));
709 }
710 return wfBean;
711 }
712 });
713 return wfBean;
714 }
715
716 /**
717 * Load the Workflow and subset Actions details and return a WorkflowJobBean. Workflow Instance is not loaded
718 *
719 * @param id Workflow Id
720 * @param start offset for select statement for actions
721 * @param len number of Workflow Actions to be returned
722 * @return Workflow Bean
723 * @throws StoreException If Workflow doesn't exist
724 */
725 public WorkflowJobBean getWorkflowInfoWithActionsSubset(final String id, final int start, final int len) throws StoreException {
726 ParamChecker.notEmpty(id, "WorkflowID");
727 WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() {
728 public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException {
729 WorkflowJobBean wfBean = null;
730 wfBean = getWorkflowforInfo(id, false);
731 if (wfBean == null) {
732 throw new StoreException(ErrorCode.E0604, id);
733 }
734 else {
735 wfBean.setActions(getActionsSubsetForWorkflow(id, start, len));
736 }
737 return wfBean;
738 }
739 });
740 return wfBean;
741 }
742
743 /**
744 * Get the Workflow ID with given external ID which will be assigned for the subworkflows.
745 *
746 * @param externalId external ID
747 * @return Workflow ID
748 * @throws StoreException if there is no job with external ID
749 */
750 public String getWorkflowIdForExternalId(final String externalId) throws StoreException {
751 ParamChecker.notEmpty(externalId, "externalId");
752 String wfId = doOperation("getWorkflowIdForExternalId", new Callable<String>() {
753 public String call() throws SQLException, StoreException {
754 String id = "";
755 Query q = entityManager.createNamedQuery("GET_WORKFLOW_ID_FOR_EXTERNAL_ID");
756 q.setParameter("externalId", externalId);
757 List<String> w = q.getResultList();
758 if (w.size() == 0) {
759 id = "";
760 }
761 else {
762 int index = w.size() - 1;
763 id = w.get(index);
764 }
765 return id;
766 }
767 });
768 return wfId;
769 }
770
771 private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
772
773 /**
774 * Purge the Workflows Completed older than given days.
775 *
776 * @param olderThanDays number of days for which to preserve the workflows
777 * @throws StoreException
778 */
779 public void purge(final long olderThanDays, final int limit) throws StoreException {
780 doOperation("purge", new Callable<Void>() {
781 public Void call() throws SQLException, StoreException, WorkflowException {
782 Timestamp maxEndTime = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS));
783 Query q = entityManager.createNamedQuery("GET_COMPLETED_WORKFLOWS_OLDER_THAN");
784 q.setParameter("endTime", maxEndTime);
785 q.setMaxResults(limit);
786 List<WorkflowJobBean> workflows = q.getResultList();
787 int actionDeleted = 0;
788 if (workflows.size() != 0) {
789 for (WorkflowJobBean w : workflows) {
790 String wfId = w.getId();
791 entityManager.remove(w);
792 Query g = entityManager.createNamedQuery("DELETE_ACTIONS_FOR_WORKFLOW");
793 g.setParameter("wfId", wfId);
794 actionDeleted += g.executeUpdate();
795 }
796 }
797 XLog.getLog(getClass()).debug("ENDED Workflow Purge deleted jobs :" + workflows.size() + " and actions " + actionDeleted);
798 return null;
799 }
800 });
801 }
802
803 private <V> V doOperation(String name, Callable<V> command) throws StoreException {
804 try {
805 Instrumentation.Cron cron = new Instrumentation.Cron();
806 cron.start();
807 V retVal;
808 try {
809 retVal = command.call();
810 }
811 finally {
812 cron.stop();
813 }
814 Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron);
815 return retVal;
816 }
817 catch (StoreException ex) {
818 throw ex;
819 }
820 catch (SQLException ex) {
821 throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex);
822 }
823 catch (Exception e) {
824 throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e);
825 }
826 }
827
828 private WorkflowJobBean getWorkflowOnly(final String id, boolean locking) throws SQLException,
829 InterruptedException, StoreException {
830 WorkflowJobBean wfBean = null;
831 Query q = entityManager.createNamedQuery("GET_WORKFLOW");
832 /*
833 * if (locking) { // q.setHint("openjpa.FetchPlan.ReadLockMode","READ");
834 * OpenJPAQuery oq = OpenJPAPersistence.cast(q); FetchPlan fetch =
835 * oq.getFetchPlan(); fetch.setReadLockMode(LockModeType.WRITE);
836 * fetch.setLockTimeout(-1); // unlimited }
837 */
838 q.setParameter("id", id);
839 List<WorkflowJobBean> w = q.getResultList();
840 if (w.size() > 0) {
841 wfBean = w.get(0);
842 }
843 return wfBean;
844 // return getBeanForRunningWorkflow(wfBean);
845 }
846
847 private WorkflowJobBean getWorkflowforInfo(final String id, boolean locking) throws SQLException,
848 InterruptedException, StoreException {
849 WorkflowJobBean wfBean = null;
850 Query q = entityManager.createNamedQuery("GET_WORKFLOW");
851 q.setParameter("id", id);
852 List<WorkflowJobBean> w = q.getResultList();
853 if (w.size() > 0) {
854 wfBean = w.get(0);
855 return getBeanForRunningWorkflow(wfBean);
856 }
857 return null;
858 }
859
860 private WorkflowJobBean getBeanForRunningWorkflow(WorkflowJobBean w) throws SQLException {
861 WorkflowJobBean wfBean = new WorkflowJobBean();
862 wfBean.setId(w.getId());
863 wfBean.setAppName(w.getAppName());
864 wfBean.setAppPath(w.getAppPath());
865 wfBean.setConf(w.getConf());
866 wfBean.setGroup(w.getGroup());
867 wfBean.setRun(w.getRun());
868 wfBean.setUser(w.getUser());
869 wfBean.setCreatedTime(w.getCreatedTime());
870 wfBean.setEndTime(w.getEndTime());
871 wfBean.setExternalId(w.getExternalId());
872 wfBean.setLastModifiedTime(w.getLastModifiedTime());
873 wfBean.setLogToken(w.getLogToken());
874 wfBean.setProtoActionConf(w.getProtoActionConf());
875 wfBean.setSlaXml(w.getSlaXml());
876 wfBean.setStartTime(w.getStartTime());
877 wfBean.setStatus(w.getStatus());
878 wfBean.setWfInstance(w.getWfInstance());
879 return wfBean;
880 }
881
882 private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) {
883
884 WorkflowJobBean wfBean = new WorkflowJobBean();
885 wfBean.setId((String) arr[0]);
886 if (arr[1] != null) {
887 wfBean.setAppName((String) arr[1]);
888 }
889 if (arr[2] != null) {
890 wfBean.setStatus(Status.valueOf((String) arr[2]));
891 }
892 if (arr[3] != null) {
893 wfBean.setRun((Integer) arr[3]);
894 }
895 if (arr[4] != null) {
896 wfBean.setUser((String) arr[4]);
897 }
898 if (arr[5] != null) {
899 wfBean.setGroup((String) arr[5]);
900 }
901 if (arr[6] != null) {
902 wfBean.setCreatedTime((Timestamp) arr[6]);
903 }
904 if (arr[7] != null) {
905 wfBean.setStartTime((Timestamp) arr[7]);
906 }
907 if (arr[8] != null) {
908 wfBean.setLastModifiedTime((Timestamp) arr[8]);
909 }
910 if (arr[9] != null) {
911 wfBean.setEndTime((Timestamp) arr[9]);
912 }
913 return wfBean;
914 }
915
916 private WorkflowActionBean getBeanForRunningAction(WorkflowActionBean a) throws SQLException {
917 if (a != null) {
918 WorkflowActionBean action = new WorkflowActionBean();
919 action.setId(a.getId());
920 action.setConf(a.getConf());
921 action.setConsoleUrl(a.getConsoleUrl());
922 action.setData(a.getData());
923 action.setStats(a.getStats());
924 action.setExternalChildIDs(a.getExternalChildIDs());
925 action.setErrorInfo(a.getErrorCode(), a.getErrorMessage());
926 action.setExternalId(a.getExternalId());
927 action.setExternalStatus(a.getExternalStatus());
928 action.setName(a.getName());
929 action.setCred(a.getCred());
930 action.setRetries(a.getRetries());
931 action.setTrackerUri(a.getTrackerUri());
932 action.setTransition(a.getTransition());
933 action.setType(a.getType());
934 action.setEndTime(a.getEndTime());
935 action.setExecutionPath(a.getExecutionPath());
936 action.setLastCheckTime(a.getLastCheckTime());
937 action.setLogToken(a.getLogToken());
938 if (a.getPending() == true) {
939 action.setPending();
940 }
941 action.setPendingAge(a.getPendingAge());
942 action.setSignalValue(a.getSignalValue());
943 action.setSlaXml(a.getSlaXml());
944 action.setStartTime(a.getStartTime());
945 action.setStatus(a.getStatus());
946 action.setJobId(a.getWfId());
947 action.setUserRetryCount(a.getUserRetryCount());
948 action.setUserRetryInterval(a.getUserRetryInterval());
949 action.setUserRetryMax(a.getUserRetryMax());
950 return action;
951 }
952 return null;
953 }
954
955 private void setWFQueryParameters(WorkflowJobBean wfBean, Query q) {
956 q.setParameter("appName", wfBean.getAppName());
957 q.setParameter("appPath", wfBean.getAppPath());
958 q.setParameter("conf", wfBean.getConf());
959 q.setParameter("groupName", wfBean.getGroup());
960 q.setParameter("run", wfBean.getRun());
961 q.setParameter("user", wfBean.getUser());
962 q.setParameter("createdTime", wfBean.getCreatedTimestamp());
963 q.setParameter("endTime", wfBean.getEndTimestamp());
964 q.setParameter("externalId", wfBean.getExternalId());
965 q.setParameter("lastModTime", new Date());
966 q.setParameter("logToken", wfBean.getLogToken());
967 q.setParameter("protoActionConf", wfBean.getProtoActionConf());
968 q.setParameter("slaXml", wfBean.getSlaXml());
969 q.setParameter("startTime", wfBean.getStartTimestamp());
970 q.setParameter("status", wfBean.getStatusStr());
971 q.setParameter("wfInstance", wfBean.getWfInstance());
972 }
973
974 private void setActionQueryParameters(WorkflowActionBean aBean, Query q) {
975 q.setParameter("conf", aBean.getConf());
976 q.setParameter("consoleUrl", aBean.getConsoleUrl());
977 q.setParameter("data", aBean.getData());
978 q.setParameter("stats", aBean.getStats());
979 q.setParameter("externalChildIDs", aBean.getExternalChildIDs());
980 q.setParameter("errorCode", aBean.getErrorCode());
981 q.setParameter("errorMessage", aBean.getErrorMessage());
982 q.setParameter("externalId", aBean.getExternalId());
983 q.setParameter("externalStatus", aBean.getExternalStatus());
984 q.setParameter("name", aBean.getName());
985 q.setParameter("cred", aBean.getCred());
986 q.setParameter("retries", aBean.getRetries());
987 q.setParameter("trackerUri", aBean.getTrackerUri());
988 q.setParameter("transition", aBean.getTransition());
989 q.setParameter("type", aBean.getType());
990 q.setParameter("endTime", aBean.getEndTimestamp());
991 q.setParameter("executionPath", aBean.getExecutionPath());
992 q.setParameter("lastCheckTime", aBean.getLastCheckTimestamp());
993 q.setParameter("logToken", aBean.getLogToken());
994 q.setParameter("pending", aBean.isPending() ? 1 : 0);
995 q.setParameter("pendingAge", aBean.getPendingAgeTimestamp());
996 q.setParameter("signalValue", aBean.getSignalValue());
997 q.setParameter("slaXml", aBean.getSlaXml());
998 q.setParameter("startTime", aBean.getStartTimestamp());
999 q.setParameter("status", aBean.getStatusStr());
1000 q.setParameter("wfId", aBean.getWfId());
1001 }
1002 }