001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.store; 019 020import java.sql.Connection; 021import java.sql.SQLException; 022import java.sql.Timestamp; 023import java.util.ArrayList; 024import java.util.Date; 025import java.util.List; 026import java.util.Map; 027import java.util.concurrent.Callable; 028 029import javax.persistence.EntityManager; 030import javax.persistence.Query; 031 032import org.apache.oozie.ErrorCode; 033import org.apache.oozie.WorkflowActionBean; 034import org.apache.oozie.WorkflowJobBean; 035import org.apache.oozie.WorkflowsInfo; 036import org.apache.oozie.client.OozieClient; 037import org.apache.oozie.client.WorkflowJob.Status; 038import org.apache.oozie.executor.jpa.JPAExecutorException; 039import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; 040import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 041import org.apache.oozie.service.InstrumentationService; 042import org.apache.oozie.service.SchemaService; 043import org.apache.oozie.service.Services; 044import org.apache.oozie.service.SchemaService.SchemaName; 045import org.apache.oozie.util.Instrumentation; 046import org.apache.oozie.util.ParamChecker; 047import org.apache.oozie.util.XLog; 048import org.apache.oozie.workflow.WorkflowException; 049import org.apache.openjpa.persistence.OpenJPAEntityManager; 050import org.apache.openjpa.persistence.OpenJPAPersistence; 051import org.apache.openjpa.persistence.OpenJPAQuery; 052import org.apache.openjpa.persistence.jdbc.FetchDirection; 053import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan; 054import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm; 055import org.apache.openjpa.persistence.jdbc.ResultSetType; 056 057/** 058 * DB Implementation of Workflow Store 059 */ 060public class WorkflowStore extends Store { 061 private Connection conn; 062 private EntityManager entityManager; 063 private boolean selectForUpdate; 064 private static final String INSTR_GROUP = "db"; 065 public static final int LOCK_TIMEOUT = 50000; 066 private static final String seletStr = "Select w.id, w.appName, w.statusStr, w.run, w.user, w.group, w.createdTimestamp, " 067 + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp from WorkflowJobBean w"; 068 private static final String countStr = "Select count(w) from WorkflowJobBean w"; 069 070 public WorkflowStore() { 071 } 072 073 public WorkflowStore(Connection connection, boolean selectForUpdate) throws StoreException { 074 super(); 075 conn = ParamChecker.notNull(connection, "conn"); 076 entityManager = getEntityManager(); 077 this.selectForUpdate = selectForUpdate; 078 } 079 080 public WorkflowStore(Connection connection, Store store, boolean selectForUpdate) throws StoreException { 081 super(store); 082 conn = ParamChecker.notNull(connection, "conn"); 083 entityManager = getEntityManager(); 084 this.selectForUpdate = selectForUpdate; 085 } 086 087 public WorkflowStore(boolean selectForUpdate) throws StoreException { 088 super(); 089 entityManager = getEntityManager(); 090 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.WORKFLOW); 091 OpenJPAEntityManager kem = OpenJPAPersistence.cast(entityManager); 092 conn = (Connection) kem.getConnection(); 093 this.selectForUpdate = selectForUpdate; 094 } 095 096 public WorkflowStore(Store store, boolean selectForUpdate) throws StoreException { 097 super(store); 098 entityManager = getEntityManager(); 099 this.selectForUpdate = selectForUpdate; 100 } 101 102 /** 103 * Create a Workflow and return a WorkflowJobBean. It also creates the process instance for the job. 104 * 105 * @param workflow workflow bean 106 * @throws StoreException 107 */ 108 109 public void insertWorkflow(final WorkflowJobBean workflow) throws StoreException { 110 ParamChecker.notNull(workflow, "workflow"); 111 112 doOperation("insertWorkflow", new Callable<Void>() { 113 public Void call() throws SQLException, StoreException, WorkflowException { 114 entityManager.persist(workflow); 115 return null; 116 } 117 }); 118 } 119 120 /** 121 * Load the Workflow into a Bean and return it. Also load the Workflow Instance into the bean. And lock the Workflow 122 * depending on the locking parameter. 123 * 124 * @param id Workflow ID 125 * @param locking true if Workflow is to be locked 126 * @return WorkflowJobBean 127 * @throws StoreException 128 */ 129 public WorkflowJobBean getWorkflow(final String id, final boolean locking) throws StoreException { 130 ParamChecker.notEmpty(id, "WorkflowID"); 131 WorkflowJobBean wfBean = doOperation("getWorkflow", new Callable<WorkflowJobBean>() { 132 public WorkflowJobBean call() throws SQLException, StoreException, WorkflowException, InterruptedException { 133 WorkflowJobBean wfBean = null; 134 wfBean = getWorkflowOnly(id, locking); 135 if (wfBean == null) { 136 throw new StoreException(ErrorCode.E0604, id); 137 } 138 /* 139 * WorkflowInstance wfInstance; //krishna and next line 140 * wfInstance = workflowLib.get(id); wfInstance = 141 * wfBean.get(wfBean.getWfInstance()); 142 * wfBean.setWorkflowInstance(wfInstance); 143 * wfBean.setWfInstance(wfInstance); 144 */ 145 return wfBean; 146 } 147 }); 148 return wfBean; 149 } 150 151 /** 152 * Get the number of Workflows with the given status. 153 * 154 * @param status Workflow Status. 155 * @return number of Workflows with given status. 156 * @throws StoreException 157 */ 158 public int getWorkflowCountWithStatus(final String status) throws StoreException { 159 ParamChecker.notEmpty(status, "status"); 160 Integer cnt = doOperation("getWorkflowCountWithStatus", new Callable<Integer>() { 161 public Integer call() throws SQLException { 162 Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS"); 163 q.setParameter("status", status); 164 Long count = (Long) q.getSingleResult(); 165 return Integer.valueOf(count.intValue()); 166 } 167 }); 168 return cnt.intValue(); 169 } 170 171 /** 172 * Get the number of Workflows with the given status which was modified in given time limit. 173 * 174 * @param status Workflow Status. 175 * @param secs No. of seconds within which the workflow got modified. 176 * @return number of Workflows modified within given time with given status. 177 * @throws StoreException 178 */ 179 public int getWorkflowCountWithStatusInLastNSeconds(final String status, final int secs) throws StoreException { 180 ParamChecker.notEmpty(status, "status"); 181 ParamChecker.notEmpty(status, "secs"); 182 Integer cnt = doOperation("getWorkflowCountWithStatusInLastNSecs", new Callable<Integer>() { 183 public Integer call() throws SQLException { 184 Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS_IN_LAST_N_SECS"); 185 Timestamp ts = new Timestamp(System.currentTimeMillis() - (secs * 1000)); 186 q.setParameter("status", status); 187 q.setParameter("lastModTime", ts); 188 Long count = (Long) q.getSingleResult(); 189 return Integer.valueOf(count.intValue()); 190 } 191 }); 192 return cnt.intValue(); 193 } 194 195 /** 196 * Update the data from Workflow Bean to DB along with the workflow instance data. Action table is not updated 197 * 198 * @param wfBean Workflow Bean 199 * @throws StoreException If Workflow doesn't exist 200 */ 201 public void updateWorkflow(final WorkflowJobBean wfBean) throws StoreException { 202 ParamChecker.notNull(wfBean, "WorkflowJobBean"); 203 doOperation("updateWorkflow", new Callable<Void>() { 204 public Void call() throws SQLException, StoreException, WorkflowException, JPAExecutorException { 205 WorkflowJobQueryExecutor.getInstance().executeUpdate( 206 WorkflowJobQueryExecutor.WorkflowJobQuery.UPDATE_WORKFLOW, wfBean); 207 return null; 208 } 209 }); 210 } 211 212 /** 213 * Create a new Action record in the ACTIONS table with the given Bean. 214 * 215 * @param action WorkflowActionBean 216 * @throws StoreException If the action is already present 217 */ 218 public void insertAction(final WorkflowActionBean action) throws StoreException { 219 ParamChecker.notNull(action, "WorkflowActionBean"); 220 doOperation("insertAction", new Callable<Void>() { 221 public Void call() throws SQLException, StoreException, WorkflowException { 222 entityManager.persist(action); 223 return null; 224 } 225 }); 226 } 227 228 /** 229 * Load the action data and returns a bean. 230 * 231 * @param id Action Id 232 * @param locking true if the action is to be locked 233 * @return Action Bean 234 * @throws StoreException If action doesn't exist 235 */ 236 public WorkflowActionBean getAction(final String id, final boolean locking) throws StoreException { 237 ParamChecker.notEmpty(id, "ActionID"); 238 WorkflowActionBean action = doOperation("getAction", new Callable<WorkflowActionBean>() { 239 public WorkflowActionBean call() throws SQLException, StoreException, WorkflowException, 240 InterruptedException { 241 Query q = entityManager.createNamedQuery("GET_ACTION"); 242 /* 243 * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q); 244 * FetchPlan fetch = oq.getFetchPlan(); 245 * fetch.setReadLockMode(LockModeType.WRITE); 246 * fetch.setLockTimeout(1000); // 1 seconds } 247 */ 248 WorkflowActionBean action = null; 249 q.setParameter("id", id); 250 List<WorkflowActionBean> actions = q.getResultList(); 251 // action = (WorkflowActionBean) q.getSingleResult(); 252 if (actions.size() > 0) { 253 action = actions.get(0); 254 } 255 else { 256 throw new StoreException(ErrorCode.E0605, id); 257 } 258 259 /* 260 * if (locking) return action; else 261 */ 262 // return action; 263 return getBeanForRunningAction(action); 264 } 265 }); 266 return action; 267 } 268 269 /** 270 * Update the given action bean to DB. 271 * 272 * @param action Action Bean 273 * @throws StoreException if action doesn't exist 274 */ 275 public void updateAction(final WorkflowActionBean action) throws StoreException { 276 ParamChecker.notNull(action, "WorkflowActionBean"); 277 doOperation("updateAction", new Callable<Void>() { 278 public Void call() throws SQLException, StoreException, WorkflowException, JPAExecutorException { 279 WorkflowActionQueryExecutor.getInstance().executeUpdate( 280 WorkflowActionQueryExecutor.WorkflowActionQuery.UPDATE_ACTION, action); 281 return null; 282 } 283 }); 284 } 285 286 /** 287 * Delete the Action with given id. 288 * 289 * @param id Action ID 290 * @throws StoreException if Action doesn't exist 291 */ 292 public void deleteAction(final String id) throws StoreException { 293 ParamChecker.notEmpty(id, "ActionID"); 294 doOperation("deleteAction", new Callable<Void>() { 295 public Void call() throws SQLException, StoreException, WorkflowException { 296 /* 297 * Query q = entityManager.createNamedQuery("DELETE_ACTION"); 298 * q.setParameter("id", id); q.executeUpdate(); 299 */ 300 WorkflowActionBean action = entityManager.find(WorkflowActionBean.class, id); 301 if (action != null) { 302 entityManager.remove(action); 303 } 304 return null; 305 } 306 }); 307 } 308 309 /** 310 * Loads all the actions for the given Workflow. Also locks all the actions if locking is true. 311 * 312 * @param wfId Workflow ID 313 * @param locking true if Actions are to be locked 314 * @return A List of WorkflowActionBean 315 * @throws StoreException 316 */ 317 public List<WorkflowActionBean> getActionsForWorkflow(final String wfId, final boolean locking) 318 throws StoreException { 319 ParamChecker.notEmpty(wfId, "WorkflowID"); 320 List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow", 321 new Callable<List<WorkflowActionBean>>() { 322 public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException, 323 InterruptedException { 324 List<WorkflowActionBean> actions; 325 List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>(); 326 try { 327 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW"); 328 329 /* 330 * OpenJPAQuery oq = OpenJPAPersistence.cast(q); 331 * if (locking) { // 332 * q.setHint("openjpa.FetchPlan.ReadLockMode" 333 * ,"WRITE"); FetchPlan fetch = oq.getFetchPlan(); 334 * fetch.setReadLockMode(LockModeType.WRITE); 335 * fetch.setLockTimeout(1000); // 1 seconds } 336 */ 337 q.setParameter("wfId", wfId); 338 actions = q.getResultList(); 339 for (WorkflowActionBean a : actions) { 340 WorkflowActionBean aa = getBeanForRunningAction(a); 341 actionList.add(aa); 342 } 343 } 344 catch (IllegalStateException e) { 345 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 346 } 347 /* 348 * if (locking) { return actions; } else { 349 */ 350 return actionList; 351 // } 352 } 353 }); 354 return actions; 355 } 356 357 /** 358 * Loads given number of actions for the given Workflow. Also locks all the actions if locking is true. 359 * 360 * @param wfId Workflow ID 361 * @param start offset for select statement 362 * @param len number of Workflow Actions to be returned 363 * @param locking true if Actions are to be locked 364 * @return A List of WorkflowActionBean 365 * @throws StoreException 366 */ 367 public List<WorkflowActionBean> getActionsSubsetForWorkflow(final String wfId, final int start, final int len) 368 throws StoreException { 369 ParamChecker.notEmpty(wfId, "WorkflowID"); 370 List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow", 371 new Callable<List<WorkflowActionBean>>() { 372 public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException, 373 InterruptedException { 374 List<WorkflowActionBean> actions; 375 List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>(); 376 try { 377 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW"); 378 OpenJPAQuery oq = OpenJPAPersistence.cast(q); 379 q.setParameter("wfId", wfId); 380 q.setFirstResult(start - 1); 381 q.setMaxResults(len); 382 actions = q.getResultList(); 383 for (WorkflowActionBean a : actions) { 384 WorkflowActionBean aa = getBeanForRunningAction(a); 385 actionList.add(aa); 386 } 387 } 388 catch (IllegalStateException e) { 389 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 390 } 391 return actionList; 392 } 393 }); 394 return actions; 395 } 396 397 /** 398 * Load All the actions that are pending for more than given time. 399 * 400 * @param minimumPendingAgeSecs Minimum Pending age in seconds 401 * @return List of action beans 402 * @throws StoreException 403 */ 404 public List<WorkflowActionBean> getPendingActions(final long minimumPendingAgeSecs) throws StoreException { 405 List<WorkflowActionBean> actions = doOperation("getPendingActions", new Callable<List<WorkflowActionBean>>() { 406 public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException { 407 Timestamp ts = new Timestamp(System.currentTimeMillis() - minimumPendingAgeSecs * 1000); 408 List<WorkflowActionBean> actionList = null; 409 try { 410 Query q = entityManager.createNamedQuery("GET_PENDING_ACTIONS"); 411 q.setParameter("pendingAge", ts); 412 actionList = q.getResultList(); 413 } 414 catch (IllegalStateException e) { 415 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 416 } 417 return actionList; 418 } 419 }); 420 return actions; 421 } 422 423 /** 424 * Load All the actions that are running and were last checked after now - miminumCheckAgeSecs 425 * 426 * @param checkAgeSecs check age in seconds. 427 * @return List of action beans. 428 * @throws StoreException 429 */ 430 public List<WorkflowActionBean> getRunningActions(final long checkAgeSecs) throws StoreException { 431 List<WorkflowActionBean> actions = doOperation("getRunningActions", new Callable<List<WorkflowActionBean>>() { 432 433 public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException { 434 List<WorkflowActionBean> actions = new ArrayList<WorkflowActionBean>(); 435 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000); 436 try { 437 Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS"); 438 q.setParameter("lastCheckTime", ts); 439 actions = q.getResultList(); 440 } 441 catch (IllegalStateException e) { 442 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 443 } 444 445 return actions; 446 } 447 }); 448 return actions; 449 } 450 451 /** 452 * Load All the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL. 453 * 454 * @param wfId String 455 * @return List of action beans 456 * @throws StoreException 457 */ 458 public List<WorkflowActionBean> getRetryAndManualActions(final String wfId) throws StoreException { 459 List<WorkflowActionBean> actions = doOperation("GET_RETRY_MANUAL_ACTIONS", 460 new Callable<List<WorkflowActionBean>>() { 461 public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException { 462 List<WorkflowActionBean> actionList = null; 463 try { 464 Query q = entityManager.createNamedQuery("GET_RETRY_MANUAL_ACTIONS"); 465 q.setParameter("wfId", wfId); 466 actionList = q.getResultList(); 467 } 468 catch (IllegalStateException e) { 469 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 470 } 471 472 return actionList; 473 } 474 }); 475 return actions; 476 } 477 478 /** 479 * Loads all the jobs that are satisfying the given filter condition. Filters can be applied on user, group, 480 * appName, status. 481 * 482 * @param filter Filter condition 483 * @param start offset for select statement 484 * @param len number of Workflows to be returned 485 * @return A list of workflows 486 * @throws StoreException 487 */ 488 public WorkflowsInfo getWorkflowsInfo(final Map<String, List<String>> filter, final int start, final int len) 489 throws StoreException { 490 491 WorkflowsInfo workFlowsInfo = doOperation("getWorkflowsInfo", new Callable<WorkflowsInfo>() { 492 @SuppressWarnings("unchecked") 493 public WorkflowsInfo call() throws SQLException, StoreException { 494 495 List<String> orArray = new ArrayList<String>(); 496 List<String> colArray = new ArrayList<String>(); 497 List<String> valArray = new ArrayList<String>(); 498 StringBuilder sb = new StringBuilder(""); 499 boolean isStatus = false; 500 boolean isGroup = false; 501 boolean isAppName = false; 502 boolean isUser = false; 503 boolean isEnabled = false; 504 int index = 0; 505 for (Map.Entry<String, List<String>> entry : filter.entrySet()) { 506 String colName = null; 507 String colVar = null; 508 if (entry.getKey().equals(OozieClient.FILTER_GROUP)) { 509 List<String> values = filter.get(OozieClient.FILTER_GROUP); 510 colName = "group"; 511 for (int i = 0; i < values.size(); i++) { 512 colVar = "group"; 513 colVar = colVar + index; 514 if (!isEnabled && !isGroup) { 515 sb.append(seletStr).append(" where w.group IN (:group" + index); 516 isGroup = true; 517 isEnabled = true; 518 } 519 else { 520 if (isEnabled && !isGroup) { 521 sb.append(" and w.group IN (:group" + index); 522 isGroup = true; 523 } 524 else { 525 if (isGroup) { 526 sb.append(", :group" + index); 527 } 528 } 529 } 530 if (i == values.size() - 1) { 531 sb.append(")"); 532 } 533 index++; 534 valArray.add(values.get(i)); 535 orArray.add(colName); 536 colArray.add(colVar); 537 } 538 } 539 else { 540 if (entry.getKey().equals(OozieClient.FILTER_STATUS)) { 541 List<String> values = filter.get(OozieClient.FILTER_STATUS); 542 colName = "status"; 543 for (int i = 0; i < values.size(); i++) { 544 colVar = "status"; 545 colVar = colVar + index; 546 if (!isEnabled && !isStatus) { 547 sb.append(seletStr).append(" where w.statusStr IN (:status" + index); 548 isStatus = true; 549 isEnabled = true; 550 } 551 else { 552 if (isEnabled && !isStatus) { 553 sb.append(" and w.statusStr IN (:status" + index); 554 isStatus = true; 555 } 556 else { 557 if (isStatus) { 558 sb.append(", :status" + index); 559 } 560 } 561 } 562 if (i == values.size() - 1) { 563 sb.append(")"); 564 } 565 index++; 566 valArray.add(values.get(i)); 567 orArray.add(colName); 568 colArray.add(colVar); 569 } 570 } 571 else { 572 if (entry.getKey().equals(OozieClient.FILTER_NAME)) { 573 List<String> values = filter.get(OozieClient.FILTER_NAME); 574 colName = "appName"; 575 for (int i = 0; i < values.size(); i++) { 576 colVar = "appName"; 577 colVar = colVar + index; 578 if (!isEnabled && !isAppName) { 579 sb.append(seletStr).append(" where w.appName IN (:appName" + index); 580 isAppName = true; 581 isEnabled = true; 582 } 583 else { 584 if (isEnabled && !isAppName) { 585 sb.append(" and w.appName IN (:appName" + index); 586 isAppName = true; 587 } 588 else { 589 if (isAppName) { 590 sb.append(", :appName" + index); 591 } 592 } 593 } 594 if (i == values.size() - 1) { 595 sb.append(")"); 596 } 597 index++; 598 valArray.add(values.get(i)); 599 orArray.add(colName); 600 colArray.add(colVar); 601 } 602 } 603 else { 604 if (entry.getKey().equals(OozieClient.FILTER_USER)) { 605 List<String> values = filter.get(OozieClient.FILTER_USER); 606 colName = "user"; 607 for (int i = 0; i < values.size(); i++) { 608 colVar = "user"; 609 colVar = colVar + index; 610 if (!isEnabled && !isUser) { 611 sb.append(seletStr).append(" where w.user IN (:user" + index); 612 isUser = true; 613 isEnabled = true; 614 } 615 else { 616 if (isEnabled && !isUser) { 617 sb.append(" and w.user IN (:user" + index); 618 isUser = true; 619 } 620 else { 621 if (isUser) { 622 sb.append(", :user" + index); 623 } 624 } 625 } 626 if (i == values.size() - 1) { 627 sb.append(")"); 628 } 629 index++; 630 valArray.add(values.get(i)); 631 orArray.add(colName); 632 colArray.add(colVar); 633 } 634 } 635 } 636 } 637 } 638 } 639 640 int realLen = 0; 641 642 Query q = null; 643 Query qTotal = null; 644 if (orArray.size() == 0) { 645 q = entityManager.createNamedQuery("GET_WORKFLOWS_COLUMNS"); 646 q.setFirstResult(start - 1); 647 q.setMaxResults(len); 648 qTotal = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT"); 649 } 650 else { 651 if (orArray.size() > 0) { 652 StringBuilder sbTotal = new StringBuilder(sb); 653 sb.append(" order by w.startTimestamp desc "); 654 XLog.getLog(getClass()).debug("Created String is **** " + sb.toString()); 655 q = entityManager.createQuery(sb.toString()); 656 q.setFirstResult(start - 1); 657 q.setMaxResults(len); 658 qTotal = entityManager.createQuery(sbTotal.toString().replace(seletStr, countStr)); 659 for (int i = 0; i < orArray.size(); i++) { 660 q.setParameter(colArray.get(i), valArray.get(i)); 661 qTotal.setParameter(colArray.get(i), valArray.get(i)); 662 } 663 } 664 } 665 666 OpenJPAQuery kq = OpenJPAPersistence.cast(q); 667 JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan(); 668 fetch.setFetchBatchSize(20); 669 fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE); 670 fetch.setFetchDirection(FetchDirection.FORWARD); 671 fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST); 672 List<?> resultList = q.getResultList(); 673 List<Object[]> objectArrList = (List<Object[]>) resultList; 674 List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>(); 675 676 for (Object[] arr : objectArrList) { 677 WorkflowJobBean ww = getBeanForWorkflowFromArray(arr); 678 wfBeansList.add(ww); 679 } 680 681 realLen = ((Long) qTotal.getSingleResult()).intValue(); 682 683 return new WorkflowsInfo(wfBeansList, start, len, realLen); 684 } 685 }); 686 return workFlowsInfo; 687 688 } 689 690 /** 691 * Load the Workflow and all Action details and return a WorkflowJobBean. Workflow Instance is not loaded 692 * 693 * @param id Workflow Id 694 * @return Workflow Bean 695 * @throws StoreException If Workflow doesn't exist 696 */ 697 public WorkflowJobBean getWorkflowInfo(final String id) throws StoreException { 698 ParamChecker.notEmpty(id, "WorkflowID"); 699 WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() { 700 public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException { 701 WorkflowJobBean wfBean = null; 702 wfBean = getWorkflowforInfo(id, false); 703 if (wfBean == null) { 704 throw new StoreException(ErrorCode.E0604, id); 705 } 706 else { 707 wfBean.setActions(getActionsForWorkflow(id, false)); 708 } 709 return wfBean; 710 } 711 }); 712 return wfBean; 713 } 714 715 /** 716 * Load the Workflow and subset Actions details and return a WorkflowJobBean. Workflow Instance is not loaded 717 * 718 * @param id Workflow Id 719 * @param start offset for select statement for actions 720 * @param len number of Workflow Actions to be returned 721 * @return Workflow Bean 722 * @throws StoreException If Workflow doesn't exist 723 */ 724 public WorkflowJobBean getWorkflowInfoWithActionsSubset(final String id, final int start, final int len) throws StoreException { 725 ParamChecker.notEmpty(id, "WorkflowID"); 726 WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() { 727 public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException { 728 WorkflowJobBean wfBean = null; 729 wfBean = getWorkflowforInfo(id, false); 730 if (wfBean == null) { 731 throw new StoreException(ErrorCode.E0604, id); 732 } 733 else { 734 wfBean.setActions(getActionsSubsetForWorkflow(id, start, len)); 735 } 736 return wfBean; 737 } 738 }); 739 return wfBean; 740 } 741 742 /** 743 * Get the Workflow ID with given external ID which will be assigned for the subworkflows. 744 * 745 * @param externalId external ID 746 * @return Workflow ID 747 * @throws StoreException if there is no job with external ID 748 */ 749 public String getWorkflowIdForExternalId(final String externalId) throws StoreException { 750 ParamChecker.notEmpty(externalId, "externalId"); 751 String wfId = doOperation("getWorkflowIdForExternalId", new Callable<String>() { 752 public String call() throws SQLException, StoreException { 753 String id = ""; 754 Query q = entityManager.createNamedQuery("GET_WORKFLOW_ID_FOR_EXTERNAL_ID"); 755 q.setParameter("externalId", externalId); 756 List<String> w = q.getResultList(); 757 if (w.size() == 0) { 758 id = ""; 759 } 760 else { 761 int index = w.size() - 1; 762 id = w.get(index); 763 } 764 return id; 765 } 766 }); 767 return wfId; 768 } 769 770 private static final long DAY_IN_MS = 24 * 60 * 60 * 1000; 771 772 /** 773 * Purge the Workflows Completed older than given days. 774 * 775 * @param olderThanDays number of days for which to preserve the workflows 776 * @throws StoreException 777 */ 778 public void purge(final long olderThanDays, final int limit) throws StoreException { 779 doOperation("purge", new Callable<Void>() { 780 public Void call() throws SQLException, StoreException, WorkflowException { 781 Timestamp maxEndTime = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS)); 782 Query q = entityManager.createNamedQuery("GET_COMPLETED_WORKFLOWS_OLDER_THAN"); 783 q.setParameter("endTime", maxEndTime); 784 q.setMaxResults(limit); 785 List<WorkflowJobBean> workflows = q.getResultList(); 786 int actionDeleted = 0; 787 if (workflows.size() != 0) { 788 for (WorkflowJobBean w : workflows) { 789 String wfId = w.getId(); 790 entityManager.remove(w); 791 Query g = entityManager.createNamedQuery("DELETE_ACTIONS_FOR_WORKFLOW"); 792 g.setParameter("wfId", wfId); 793 actionDeleted += g.executeUpdate(); 794 } 795 } 796 XLog.getLog(getClass()).debug("ENDED Workflow Purge deleted jobs :" + workflows.size() + " and actions " + actionDeleted); 797 return null; 798 } 799 }); 800 } 801 802 private <V> V doOperation(String name, Callable<V> command) throws StoreException { 803 try { 804 Instrumentation.Cron cron = new Instrumentation.Cron(); 805 cron.start(); 806 V retVal; 807 try { 808 retVal = command.call(); 809 } 810 finally { 811 cron.stop(); 812 } 813 Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron); 814 return retVal; 815 } 816 catch (StoreException ex) { 817 throw ex; 818 } 819 catch (SQLException ex) { 820 throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex); 821 } 822 catch (Exception e) { 823 throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e); 824 } 825 } 826 827 private WorkflowJobBean getWorkflowOnly(final String id, boolean locking) throws SQLException, 828 InterruptedException, StoreException { 829 WorkflowJobBean wfBean = null; 830 Query q = entityManager.createNamedQuery("GET_WORKFLOW"); 831 /* 832 * if (locking) { // q.setHint("openjpa.FetchPlan.ReadLockMode","READ"); 833 * OpenJPAQuery oq = OpenJPAPersistence.cast(q); FetchPlan fetch = 834 * oq.getFetchPlan(); fetch.setReadLockMode(LockModeType.WRITE); 835 * fetch.setLockTimeout(-1); // unlimited } 836 */ 837 q.setParameter("id", id); 838 List<WorkflowJobBean> w = q.getResultList(); 839 if (w.size() > 0) { 840 wfBean = w.get(0); 841 } 842 return wfBean; 843 // return getBeanForRunningWorkflow(wfBean); 844 } 845 846 private WorkflowJobBean getWorkflowforInfo(final String id, boolean locking) throws SQLException, 847 InterruptedException, StoreException { 848 WorkflowJobBean wfBean = null; 849 Query q = entityManager.createNamedQuery("GET_WORKFLOW"); 850 q.setParameter("id", id); 851 List<WorkflowJobBean> w = q.getResultList(); 852 if (w.size() > 0) { 853 wfBean = w.get(0); 854 return getBeanForRunningWorkflow(wfBean); 855 } 856 return null; 857 } 858 859 private WorkflowJobBean getBeanForRunningWorkflow(WorkflowJobBean w) throws SQLException { 860 WorkflowJobBean wfBean = new WorkflowJobBean(); 861 wfBean.setId(w.getId()); 862 wfBean.setAppName(w.getAppName()); 863 wfBean.setAppPath(w.getAppPath()); 864 wfBean.setConfBlob(w.getConfBlob()); 865 wfBean.setGroup(w.getGroup()); 866 wfBean.setRun(w.getRun()); 867 wfBean.setUser(w.getUser()); 868 wfBean.setCreatedTime(w.getCreatedTime()); 869 wfBean.setEndTime(w.getEndTime()); 870 wfBean.setExternalId(w.getExternalId()); 871 wfBean.setLastModifiedTime(w.getLastModifiedTime()); 872 wfBean.setLogToken(w.getLogToken()); 873 wfBean.setProtoActionConfBlob(w.getProtoActionConfBlob()); 874 wfBean.setSlaXmlBlob(w.getSlaXmlBlob()); 875 wfBean.setStartTime(w.getStartTime()); 876 wfBean.setStatus(w.getStatus()); 877 wfBean.setWfInstanceBlob(w.getWfInstanceBlob()); 878 return wfBean; 879 } 880 881 private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) { 882 883 WorkflowJobBean wfBean = new WorkflowJobBean(); 884 wfBean.setId((String) arr[0]); 885 if (arr[1] != null) { 886 wfBean.setAppName((String) arr[1]); 887 } 888 if (arr[2] != null) { 889 wfBean.setStatus(Status.valueOf((String) arr[2])); 890 } 891 if (arr[3] != null) { 892 wfBean.setRun((Integer) arr[3]); 893 } 894 if (arr[4] != null) { 895 wfBean.setUser((String) arr[4]); 896 } 897 if (arr[5] != null) { 898 wfBean.setGroup((String) arr[5]); 899 } 900 if (arr[6] != null) { 901 wfBean.setCreatedTime((Timestamp) arr[6]); 902 } 903 if (arr[7] != null) { 904 wfBean.setStartTime((Timestamp) arr[7]); 905 } 906 if (arr[8] != null) { 907 wfBean.setLastModifiedTime((Timestamp) arr[8]); 908 } 909 if (arr[9] != null) { 910 wfBean.setEndTime((Timestamp) arr[9]); 911 } 912 return wfBean; 913 } 914 915 private WorkflowActionBean getBeanForRunningAction(WorkflowActionBean a) throws SQLException { 916 if (a != null) { 917 WorkflowActionBean action = new WorkflowActionBean(); 918 action.setId(a.getId()); 919 action.setConfBlob(a.getConfBlob()); 920 action.setConsoleUrl(a.getConsoleUrl()); 921 action.setDataBlob(a.getDataBlob()); 922 action.setStatsBlob(a.getStatsBlob()); 923 action.setExternalChildIDsBlob(a.getExternalChildIDsBlob()); 924 action.setErrorInfo(a.getErrorCode(), a.getErrorMessage()); 925 action.setExternalId(a.getExternalId()); 926 action.setExternalStatus(a.getExternalStatus()); 927 action.setName(a.getName()); 928 action.setCred(a.getCred()); 929 action.setRetries(a.getRetries()); 930 action.setTrackerUri(a.getTrackerUri()); 931 action.setTransition(a.getTransition()); 932 action.setType(a.getType()); 933 action.setEndTime(a.getEndTime()); 934 action.setExecutionPath(a.getExecutionPath()); 935 action.setLastCheckTime(a.getLastCheckTime()); 936 action.setLogToken(a.getLogToken()); 937 if (a.isPending() == true) { 938 action.setPending(); 939 } 940 action.setPendingAge(a.getPendingAge()); 941 action.setSignalValue(a.getSignalValue()); 942 action.setSlaXmlBlob(a.getSlaXmlBlob()); 943 action.setStartTime(a.getStartTime()); 944 action.setStatus(a.getStatus()); 945 action.setJobId(a.getWfId()); 946 action.setUserRetryCount(a.getUserRetryCount()); 947 action.setUserRetryInterval(a.getUserRetryInterval()); 948 action.setUserRetryMax(a.getUserRetryMax()); 949 return action; 950 } 951 return null; 952 } 953}