001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.store; 020 021import java.sql.Connection; 022import java.sql.SQLException; 023import java.sql.Timestamp; 024import java.util.ArrayList; 025import java.util.Date; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.Callable; 029 030import javax.persistence.EntityManager; 031import javax.persistence.Query; 032 033import org.apache.oozie.ErrorCode; 034import org.apache.oozie.WorkflowActionBean; 035import org.apache.oozie.WorkflowJobBean; 036import org.apache.oozie.WorkflowsInfo; 037import org.apache.oozie.client.OozieClient; 038import org.apache.oozie.client.WorkflowJob.Status; 039import org.apache.oozie.executor.jpa.JPAExecutorException; 040import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; 041import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 042import org.apache.oozie.service.InstrumentationService; 043import org.apache.oozie.service.SchemaService; 044import org.apache.oozie.service.Services; 045import org.apache.oozie.service.SchemaService.SchemaName; 046import org.apache.oozie.util.Instrumentation; 047import org.apache.oozie.util.ParamChecker; 048import org.apache.oozie.util.XLog; 049import org.apache.oozie.workflow.WorkflowException; 050import org.apache.openjpa.persistence.OpenJPAEntityManager; 051import org.apache.openjpa.persistence.OpenJPAPersistence; 052import org.apache.openjpa.persistence.OpenJPAQuery; 053import org.apache.openjpa.persistence.jdbc.FetchDirection; 054import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan; 055import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm; 056import org.apache.openjpa.persistence.jdbc.ResultSetType; 057 058/** 059 * DB Implementation of Workflow Store 060 */ 061public class WorkflowStore extends Store { 062 private Connection conn; 063 private EntityManager entityManager; 064 private boolean selectForUpdate; 065 private static final String INSTR_GROUP = "db"; 066 public static final int LOCK_TIMEOUT = 50000; 067 private static final String seletStr = "Select w.id, w.appName, w.statusStr, w.run, w.user, w.group, w.createdTimestamp, " 068 + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp from WorkflowJobBean w"; 069 private static final String countStr = "Select count(w) from WorkflowJobBean w"; 070 071 public WorkflowStore() { 072 } 073 074 public WorkflowStore(Connection connection, boolean selectForUpdate) throws StoreException { 075 super(); 076 conn = ParamChecker.notNull(connection, "conn"); 077 entityManager = getEntityManager(); 078 this.selectForUpdate = selectForUpdate; 079 } 080 081 public WorkflowStore(Connection connection, Store store, boolean selectForUpdate) throws StoreException { 082 super(store); 083 conn = ParamChecker.notNull(connection, "conn"); 084 entityManager = getEntityManager(); 085 this.selectForUpdate = selectForUpdate; 086 } 087 088 public WorkflowStore(boolean selectForUpdate) throws StoreException { 089 super(); 090 entityManager = getEntityManager(); 091 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.WORKFLOW); 092 OpenJPAEntityManager kem = OpenJPAPersistence.cast(entityManager); 093 conn = (Connection) kem.getConnection(); 094 this.selectForUpdate = selectForUpdate; 095 } 096 097 public WorkflowStore(Store store, boolean selectForUpdate) throws StoreException { 098 super(store); 099 entityManager = getEntityManager(); 100 this.selectForUpdate = selectForUpdate; 101 } 102 103 /** 104 * Create a Workflow and return a WorkflowJobBean. It also creates the process instance for the job. 105 * 106 * @param workflow workflow bean 107 * @throws StoreException 108 */ 109 110 public void insertWorkflow(final WorkflowJobBean workflow) throws StoreException { 111 ParamChecker.notNull(workflow, "workflow"); 112 113 doOperation("insertWorkflow", new Callable<Void>() { 114 public Void call() throws SQLException, StoreException, WorkflowException { 115 entityManager.persist(workflow); 116 return null; 117 } 118 }); 119 } 120 121 /** 122 * Load the Workflow into a Bean and return it. Also load the Workflow Instance into the bean. And lock the Workflow 123 * depending on the locking parameter. 124 * 125 * @param id Workflow ID 126 * @param locking true if Workflow is to be locked 127 * @return WorkflowJobBean 128 * @throws StoreException 129 */ 130 public WorkflowJobBean getWorkflow(final String id, final boolean locking) throws StoreException { 131 ParamChecker.notEmpty(id, "WorkflowID"); 132 WorkflowJobBean wfBean = doOperation("getWorkflow", new Callable<WorkflowJobBean>() { 133 public WorkflowJobBean call() throws SQLException, StoreException, WorkflowException, InterruptedException { 134 WorkflowJobBean wfBean = null; 135 wfBean = getWorkflowOnly(id, locking); 136 if (wfBean == null) { 137 throw new StoreException(ErrorCode.E0604, id); 138 } 139 /* 140 * WorkflowInstance wfInstance; //krishna and next line 141 * wfInstance = workflowLib.get(id); wfInstance = 142 * wfBean.get(wfBean.getWfInstance()); 143 * wfBean.setWorkflowInstance(wfInstance); 144 * wfBean.setWfInstance(wfInstance); 145 */ 146 return wfBean; 147 } 148 }); 149 return wfBean; 150 } 151 152 /** 153 * Get the number of Workflows with the given status. 154 * 155 * @param status Workflow Status. 156 * @return number of Workflows with given status. 157 * @throws StoreException 158 */ 159 public int getWorkflowCountWithStatus(final String status) throws StoreException { 160 ParamChecker.notEmpty(status, "status"); 161 Integer cnt = doOperation("getWorkflowCountWithStatus", new Callable<Integer>() { 162 public Integer call() throws SQLException { 163 Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS"); 164 q.setParameter("status", status); 165 Long count = (Long) q.getSingleResult(); 166 return Integer.valueOf(count.intValue()); 167 } 168 }); 169 return cnt.intValue(); 170 } 171 172 /** 173 * Get the number of Workflows with the given status which was modified in given time limit. 174 * 175 * @param status Workflow Status. 176 * @param secs No. of seconds within which the workflow got modified. 177 * @return number of Workflows modified within given time with given status. 178 * @throws StoreException 179 */ 180 public int getWorkflowCountWithStatusInLastNSeconds(final String status, final int secs) throws StoreException { 181 ParamChecker.notEmpty(status, "status"); 182 ParamChecker.notEmpty(status, "secs"); 183 Integer cnt = doOperation("getWorkflowCountWithStatusInLastNSecs", new Callable<Integer>() { 184 public Integer call() throws SQLException { 185 Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS_IN_LAST_N_SECS"); 186 Timestamp ts = new Timestamp(System.currentTimeMillis() - (secs * 1000)); 187 q.setParameter("status", status); 188 q.setParameter("lastModTime", ts); 189 Long count = (Long) q.getSingleResult(); 190 return Integer.valueOf(count.intValue()); 191 } 192 }); 193 return cnt.intValue(); 194 } 195 196 /** 197 * Update the data from Workflow Bean to DB along with the workflow instance data. Action table is not updated 198 * 199 * @param wfBean Workflow Bean 200 * @throws StoreException If Workflow doesn't exist 201 */ 202 public void updateWorkflow(final WorkflowJobBean wfBean) throws StoreException { 203 ParamChecker.notNull(wfBean, "WorkflowJobBean"); 204 doOperation("updateWorkflow", new Callable<Void>() { 205 public Void call() throws SQLException, StoreException, WorkflowException, JPAExecutorException { 206 WorkflowJobQueryExecutor.getInstance().executeUpdate( 207 WorkflowJobQueryExecutor.WorkflowJobQuery.UPDATE_WORKFLOW, wfBean); 208 return null; 209 } 210 }); 211 } 212 213 /** 214 * Create a new Action record in the ACTIONS table with the given Bean. 215 * 216 * @param action WorkflowActionBean 217 * @throws StoreException If the action is already present 218 */ 219 public void insertAction(final WorkflowActionBean action) throws StoreException { 220 ParamChecker.notNull(action, "WorkflowActionBean"); 221 doOperation("insertAction", new Callable<Void>() { 222 public Void call() throws SQLException, StoreException, WorkflowException { 223 entityManager.persist(action); 224 return null; 225 } 226 }); 227 } 228 229 /** 230 * Load the action data and returns a bean. 231 * 232 * @param id Action Id 233 * @param locking true if the action is to be locked 234 * @return Action Bean 235 * @throws StoreException If action doesn't exist 236 */ 237 public WorkflowActionBean getAction(final String id, final boolean locking) throws StoreException { 238 ParamChecker.notEmpty(id, "ActionID"); 239 WorkflowActionBean action = doOperation("getAction", new Callable<WorkflowActionBean>() { 240 public WorkflowActionBean call() throws SQLException, StoreException, WorkflowException, 241 InterruptedException { 242 Query q = entityManager.createNamedQuery("GET_ACTION"); 243 /* 244 * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q); 245 * FetchPlan fetch = oq.getFetchPlan(); 246 * fetch.setReadLockMode(LockModeType.WRITE); 247 * fetch.setLockTimeout(1000); // 1 seconds } 248 */ 249 WorkflowActionBean action = null; 250 q.setParameter("id", id); 251 List<WorkflowActionBean> actions = q.getResultList(); 252 // action = (WorkflowActionBean) q.getSingleResult(); 253 if (actions.size() > 0) { 254 action = actions.get(0); 255 } 256 else { 257 throw new StoreException(ErrorCode.E0605, id); 258 } 259 260 /* 261 * if (locking) return action; else 262 */ 263 // return action; 264 return getBeanForRunningAction(action); 265 } 266 }); 267 return action; 268 } 269 270 /** 271 * Update the given action bean to DB. 272 * 273 * @param action Action Bean 274 * @throws StoreException if action doesn't exist 275 */ 276 public void updateAction(final WorkflowActionBean action) throws StoreException { 277 ParamChecker.notNull(action, "WorkflowActionBean"); 278 doOperation("updateAction", new Callable<Void>() { 279 public Void call() throws SQLException, StoreException, WorkflowException, JPAExecutorException { 280 WorkflowActionQueryExecutor.getInstance().executeUpdate( 281 WorkflowActionQueryExecutor.WorkflowActionQuery.UPDATE_ACTION, action); 282 return null; 283 } 284 }); 285 } 286 287 /** 288 * Delete the Action with given id. 289 * 290 * @param id Action ID 291 * @throws StoreException if Action doesn't exist 292 */ 293 public void deleteAction(final String id) throws StoreException { 294 ParamChecker.notEmpty(id, "ActionID"); 295 doOperation("deleteAction", new Callable<Void>() { 296 public Void call() throws SQLException, StoreException, WorkflowException { 297 /* 298 * Query q = entityManager.createNamedQuery("DELETE_ACTION"); 299 * q.setParameter("id", id); q.executeUpdate(); 300 */ 301 WorkflowActionBean action = entityManager.find(WorkflowActionBean.class, id); 302 if (action != null) { 303 entityManager.remove(action); 304 } 305 return null; 306 } 307 }); 308 } 309 310 /** 311 * Loads all the actions for the given Workflow. Also locks all the actions if locking is true. 312 * 313 * @param wfId Workflow ID 314 * @param locking true if Actions are to be locked 315 * @return A List of WorkflowActionBean 316 * @throws StoreException 317 */ 318 public List<WorkflowActionBean> getActionsForWorkflow(final String wfId, final boolean locking) 319 throws StoreException { 320 ParamChecker.notEmpty(wfId, "WorkflowID"); 321 List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow", 322 new Callable<List<WorkflowActionBean>>() { 323 public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException, 324 InterruptedException { 325 List<WorkflowActionBean> actions; 326 List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>(); 327 try { 328 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW"); 329 330 /* 331 * OpenJPAQuery oq = OpenJPAPersistence.cast(q); 332 * if (locking) { // 333 * q.setHint("openjpa.FetchPlan.ReadLockMode" 334 * ,"WRITE"); FetchPlan fetch = oq.getFetchPlan(); 335 * fetch.setReadLockMode(LockModeType.WRITE); 336 * fetch.setLockTimeout(1000); // 1 seconds } 337 */ 338 q.setParameter("wfId", wfId); 339 actions = q.getResultList(); 340 for (WorkflowActionBean a : actions) { 341 WorkflowActionBean aa = getBeanForRunningAction(a); 342 actionList.add(aa); 343 } 344 } 345 catch (IllegalStateException e) { 346 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 347 } 348 /* 349 * if (locking) { return actions; } else { 350 */ 351 return actionList; 352 // } 353 } 354 }); 355 return actions; 356 } 357 358 /** 359 * Loads given number of actions for the given Workflow. Also locks all the actions if locking is true. 360 * 361 * @param wfId Workflow ID 362 * @param start offset for select statement 363 * @param len number of Workflow Actions to be returned 364 * @return A List of WorkflowActionBean 365 * @throws StoreException 366 */ 367 public List<WorkflowActionBean> getActionsSubsetForWorkflow(final String wfId, final int start, final int len) 368 throws StoreException { 369 ParamChecker.notEmpty(wfId, "WorkflowID"); 370 List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow", 371 new Callable<List<WorkflowActionBean>>() { 372 public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException, 373 InterruptedException { 374 List<WorkflowActionBean> actions; 375 List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>(); 376 try { 377 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW"); 378 OpenJPAQuery oq = OpenJPAPersistence.cast(q); 379 q.setParameter("wfId", wfId); 380 q.setFirstResult(start - 1); 381 q.setMaxResults(len); 382 actions = q.getResultList(); 383 for (WorkflowActionBean a : actions) { 384 WorkflowActionBean aa = getBeanForRunningAction(a); 385 actionList.add(aa); 386 } 387 } 388 catch (IllegalStateException e) { 389 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 390 } 391 return actionList; 392 } 393 }); 394 return actions; 395 } 396 397 /** 398 * Load All the actions that are pending for more than given time. 399 * 400 * @param minimumPendingAgeSecs Minimum Pending age in seconds 401 * @return List of action beans 402 * @throws StoreException 403 */ 404 public List<WorkflowActionBean> getPendingActions(final long minimumPendingAgeSecs) throws StoreException { 405 List<WorkflowActionBean> actions = doOperation("getPendingActions", new Callable<List<WorkflowActionBean>>() { 406 public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException { 407 Timestamp ts = new Timestamp(System.currentTimeMillis() - minimumPendingAgeSecs * 1000); 408 List<WorkflowActionBean> actionList = null; 409 try { 410 Query q = entityManager.createNamedQuery("GET_PENDING_ACTIONS"); 411 q.setParameter("pendingAge", ts); 412 actionList = q.getResultList(); 413 } 414 catch (IllegalStateException e) { 415 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 416 } 417 return actionList; 418 } 419 }); 420 return actions; 421 } 422 423 /** 424 * Load All the actions that are running and were last checked after now - miminumCheckAgeSecs 425 * 426 * @param checkAgeSecs check age in seconds. 427 * @return List of action beans. 428 * @throws StoreException 429 */ 430 public List<WorkflowActionBean> getRunningActions(final long checkAgeSecs) throws StoreException { 431 List<WorkflowActionBean> actions = doOperation("getRunningActions", new Callable<List<WorkflowActionBean>>() { 432 433 public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException { 434 List<WorkflowActionBean> actions = new ArrayList<WorkflowActionBean>(); 435 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000); 436 try { 437 Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS"); 438 q.setParameter("lastCheckTime", ts); 439 actions = q.getResultList(); 440 } 441 catch (IllegalStateException e) { 442 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 443 } 444 445 return actions; 446 } 447 }); 448 return actions; 449 } 450 451 /** 452 * Load All the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL. 453 * 454 * @param wfId String 455 * @return List of action beans 456 * @throws StoreException 457 */ 458 public List<WorkflowActionBean> getRetryAndManualActions(final String wfId) throws StoreException { 459 List<WorkflowActionBean> actions = doOperation("GET_RETRY_MANUAL_ACTIONS", 460 new Callable<List<WorkflowActionBean>>() { 461 public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException { 462 List<WorkflowActionBean> actionList = null; 463 try { 464 Query q = entityManager.createNamedQuery("GET_RETRY_MANUAL_ACTIONS"); 465 q.setParameter("wfId", wfId); 466 actionList = q.getResultList(); 467 } 468 catch (IllegalStateException e) { 469 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 470 } 471 472 return actionList; 473 } 474 }); 475 return actions; 476 } 477 478 /** 479 * Loads all the jobs that are satisfying the given filter condition. Filters can be applied on user, group, 480 * appName, status. 481 * 482 * @param filter Filter condition 483 * @param start offset for select statement 484 * @param len number of Workflows to be returned 485 * @return A list of workflows 486 * @throws StoreException 487 */ 488 public WorkflowsInfo getWorkflowsInfo(final Map<String, List<String>> filter, final int start, final int len) 489 throws StoreException { 490 491 WorkflowsInfo workFlowsInfo = doOperation("getWorkflowsInfo", new Callable<WorkflowsInfo>() { 492 @SuppressWarnings("unchecked") 493 public WorkflowsInfo call() throws SQLException, StoreException { 494 495 List<String> orArray = new ArrayList<String>(); 496 List<String> colArray = new ArrayList<String>(); 497 List<String> valArray = new ArrayList<String>(); 498 StringBuilder sb = new StringBuilder(""); 499 boolean isStatus = false; 500 boolean isGroup = false; 501 boolean isAppName = false; 502 boolean isUser = false; 503 boolean isEnabled = false; 504 int index = 0; 505 for (Map.Entry<String, List<String>> entry : filter.entrySet()) { 506 String colName = null; 507 String colVar = null; 508 if (entry.getKey().equals(OozieClient.FILTER_GROUP)) { 509 List<String> values = filter.get(OozieClient.FILTER_GROUP); 510 colName = "group"; 511 for (int i = 0; i < values.size(); i++) { 512 colVar = "group"; 513 colVar = colVar + index; 514 if (!isEnabled && !isGroup) { 515 sb.append(seletStr).append(" where w.group IN (:group" + index); 516 isGroup = true; 517 isEnabled = true; 518 } 519 else { 520 if (isEnabled && !isGroup) { 521 sb.append(" and w.group IN (:group" + index); 522 isGroup = true; 523 } 524 else { 525 if (isGroup) { 526 sb.append(", :group" + index); 527 } 528 } 529 } 530 if (i == values.size() - 1) { 531 sb.append(")"); 532 } 533 index++; 534 valArray.add(values.get(i)); 535 orArray.add(colName); 536 colArray.add(colVar); 537 } 538 } 539 else { 540 if (entry.getKey().equals(OozieClient.FILTER_STATUS)) { 541 List<String> values = filter.get(OozieClient.FILTER_STATUS); 542 colName = "status"; 543 for (int i = 0; i < values.size(); i++) { 544 colVar = "status"; 545 colVar = colVar + index; 546 if (!isEnabled && !isStatus) { 547 sb.append(seletStr).append(" where w.statusStr IN (:status" + index); 548 isStatus = true; 549 isEnabled = true; 550 } 551 else { 552 if (isEnabled && !isStatus) { 553 sb.append(" and w.statusStr IN (:status" + index); 554 isStatus = true; 555 } 556 else { 557 if (isStatus) { 558 sb.append(", :status" + index); 559 } 560 } 561 } 562 if (i == values.size() - 1) { 563 sb.append(")"); 564 } 565 index++; 566 valArray.add(values.get(i)); 567 orArray.add(colName); 568 colArray.add(colVar); 569 } 570 } 571 else { 572 if (entry.getKey().equals(OozieClient.FILTER_NAME)) { 573 List<String> values = filter.get(OozieClient.FILTER_NAME); 574 colName = "appName"; 575 for (int i = 0; i < values.size(); i++) { 576 colVar = "appName"; 577 colVar = colVar + index; 578 if (!isEnabled && !isAppName) { 579 sb.append(seletStr).append(" where w.appName IN (:appName" + index); 580 isAppName = true; 581 isEnabled = true; 582 } 583 else { 584 if (isEnabled && !isAppName) { 585 sb.append(" and w.appName IN (:appName" + index); 586 isAppName = true; 587 } 588 else { 589 if (isAppName) { 590 sb.append(", :appName" + index); 591 } 592 } 593 } 594 if (i == values.size() - 1) { 595 sb.append(")"); 596 } 597 index++; 598 valArray.add(values.get(i)); 599 orArray.add(colName); 600 colArray.add(colVar); 601 } 602 } 603 else { 604 if (entry.getKey().equals(OozieClient.FILTER_USER)) { 605 List<String> values = filter.get(OozieClient.FILTER_USER); 606 colName = "user"; 607 for (int i = 0; i < values.size(); i++) { 608 colVar = "user"; 609 colVar = colVar + index; 610 if (!isEnabled && !isUser) { 611 sb.append(seletStr).append(" where w.user IN (:user" + index); 612 isUser = true; 613 isEnabled = true; 614 } 615 else { 616 if (isEnabled && !isUser) { 617 sb.append(" and w.user IN (:user" + index); 618 isUser = true; 619 } 620 else { 621 if (isUser) { 622 sb.append(", :user" + index); 623 } 624 } 625 } 626 if (i == values.size() - 1) { 627 sb.append(")"); 628 } 629 index++; 630 valArray.add(values.get(i)); 631 orArray.add(colName); 632 colArray.add(colVar); 633 } 634 } 635 } 636 } 637 } 638 } 639 640 int realLen = 0; 641 642 Query q = null; 643 Query qTotal = null; 644 if (orArray.size() == 0) { 645 q = entityManager.createNamedQuery("GET_WORKFLOWS_COLUMNS"); 646 q.setFirstResult(start - 1); 647 q.setMaxResults(len); 648 qTotal = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT"); 649 } 650 else { 651 if (orArray.size() > 0) { 652 StringBuilder sbTotal = new StringBuilder(sb); 653 sb.append(" order by w.startTimestamp desc "); 654 XLog.getLog(getClass()).debug("Created String is **** " + sb.toString()); 655 q = entityManager.createQuery(sb.toString()); 656 q.setFirstResult(start - 1); 657 q.setMaxResults(len); 658 qTotal = entityManager.createQuery(sbTotal.toString().replace(seletStr, countStr)); 659 for (int i = 0; i < orArray.size(); i++) { 660 q.setParameter(colArray.get(i), valArray.get(i)); 661 qTotal.setParameter(colArray.get(i), valArray.get(i)); 662 } 663 } 664 } 665 666 OpenJPAQuery kq = OpenJPAPersistence.cast(q); 667 JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan(); 668 fetch.setFetchBatchSize(20); 669 fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE); 670 fetch.setFetchDirection(FetchDirection.FORWARD); 671 fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST); 672 List<?> resultList = q.getResultList(); 673 List<Object[]> objectArrList = (List<Object[]>) resultList; 674 List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>(); 675 676 for (Object[] arr : objectArrList) { 677 WorkflowJobBean ww = getBeanForWorkflowFromArray(arr); 678 wfBeansList.add(ww); 679 } 680 681 realLen = ((Long) qTotal.getSingleResult()).intValue(); 682 683 return new WorkflowsInfo(wfBeansList, start, len, realLen); 684 } 685 }); 686 return workFlowsInfo; 687 688 } 689 690 /** 691 * Load the Workflow and all Action details and return a WorkflowJobBean. Workflow Instance is not loaded 692 * 693 * @param id Workflow Id 694 * @return Workflow Bean 695 * @throws StoreException If Workflow doesn't exist 696 */ 697 public WorkflowJobBean getWorkflowInfo(final String id) throws StoreException { 698 ParamChecker.notEmpty(id, "WorkflowID"); 699 WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() { 700 public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException { 701 WorkflowJobBean wfBean = null; 702 wfBean = getWorkflowforInfo(id, false); 703 if (wfBean == null) { 704 throw new StoreException(ErrorCode.E0604, id); 705 } 706 else { 707 wfBean.setActions(getActionsForWorkflow(id, false)); 708 } 709 return wfBean; 710 } 711 }); 712 return wfBean; 713 } 714 715 /** 716 * Load the Workflow and subset Actions details and return a WorkflowJobBean. Workflow Instance is not loaded 717 * 718 * @param id Workflow Id 719 * @param start offset for select statement for actions 720 * @param len number of Workflow Actions to be returned 721 * @return Workflow Bean 722 * @throws StoreException If Workflow doesn't exist 723 */ 724 public WorkflowJobBean getWorkflowInfoWithActionsSubset(final String id, final int start, final int len) throws StoreException { 725 ParamChecker.notEmpty(id, "WorkflowID"); 726 WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() { 727 public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException { 728 WorkflowJobBean wfBean = null; 729 wfBean = getWorkflowforInfo(id, false); 730 if (wfBean == null) { 731 throw new StoreException(ErrorCode.E0604, id); 732 } 733 else { 734 wfBean.setActions(getActionsSubsetForWorkflow(id, start, len)); 735 } 736 return wfBean; 737 } 738 }); 739 return wfBean; 740 } 741 742 /** 743 * Get the Workflow ID with given external ID which will be assigned for the subworkflows. 744 * 745 * @param externalId external ID 746 * @return Workflow ID 747 * @throws StoreException if there is no job with external ID 748 */ 749 public String getWorkflowIdForExternalId(final String externalId) throws StoreException { 750 ParamChecker.notEmpty(externalId, "externalId"); 751 String wfId = doOperation("getWorkflowIdForExternalId", new Callable<String>() { 752 public String call() throws SQLException, StoreException { 753 String id = ""; 754 Query q = entityManager.createNamedQuery("GET_WORKFLOW_ID_FOR_EXTERNAL_ID"); 755 q.setParameter("externalId", externalId); 756 List<String> w = q.getResultList(); 757 if (w.size() == 0) { 758 id = ""; 759 } 760 else { 761 int index = w.size() - 1; 762 id = w.get(index); 763 } 764 return id; 765 } 766 }); 767 return wfId; 768 } 769 770 private static final long DAY_IN_MS = 24 * 60 * 60 * 1000; 771 772 /** 773 * Purge the Workflows Completed older than given days. 774 * 775 * @param olderThanDays number of days for which to preserve the workflows 776 * @throws StoreException 777 */ 778 public void purge(final long olderThanDays, final int limit) throws StoreException { 779 doOperation("purge", new Callable<Void>() { 780 public Void call() throws SQLException, StoreException, WorkflowException { 781 Timestamp maxEndTime = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS)); 782 Query q = entityManager.createNamedQuery("GET_COMPLETED_WORKFLOWS_OLDER_THAN"); 783 q.setParameter("endTime", maxEndTime); 784 q.setMaxResults(limit); 785 List<WorkflowJobBean> workflows = q.getResultList(); 786 int actionDeleted = 0; 787 if (workflows.size() != 0) { 788 for (WorkflowJobBean w : workflows) { 789 String wfId = w.getId(); 790 entityManager.remove(w); 791 Query g = entityManager.createNamedQuery("DELETE_ACTIONS_FOR_WORKFLOW"); 792 g.setParameter("wfId", wfId); 793 actionDeleted += g.executeUpdate(); 794 } 795 } 796 XLog.getLog(getClass()).debug("ENDED Workflow Purge deleted jobs :" + workflows.size() + " and actions " + actionDeleted); 797 return null; 798 } 799 }); 800 } 801 802 private <V> V doOperation(String name, Callable<V> command) throws StoreException { 803 try { 804 Instrumentation.Cron cron = new Instrumentation.Cron(); 805 cron.start(); 806 V retVal; 807 try { 808 retVal = command.call(); 809 } 810 finally { 811 cron.stop(); 812 } 813 Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron); 814 return retVal; 815 } 816 catch (StoreException ex) { 817 throw ex; 818 } 819 catch (SQLException ex) { 820 throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex); 821 } 822 catch (Exception e) { 823 throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e); 824 } 825 } 826 827 private WorkflowJobBean getWorkflowOnly(final String id, boolean locking) throws SQLException, 828 InterruptedException, StoreException { 829 WorkflowJobBean wfBean = null; 830 Query q = entityManager.createNamedQuery("GET_WORKFLOW"); 831 /* 832 * if (locking) { // q.setHint("openjpa.FetchPlan.ReadLockMode","READ"); 833 * OpenJPAQuery oq = OpenJPAPersistence.cast(q); FetchPlan fetch = 834 * oq.getFetchPlan(); fetch.setReadLockMode(LockModeType.WRITE); 835 * fetch.setLockTimeout(-1); // unlimited } 836 */ 837 q.setParameter("id", id); 838 List<WorkflowJobBean> w = q.getResultList(); 839 if (w.size() > 0) { 840 wfBean = w.get(0); 841 } 842 return wfBean; 843 // return getBeanForRunningWorkflow(wfBean); 844 } 845 846 private WorkflowJobBean getWorkflowforInfo(final String id, boolean locking) throws SQLException, 847 InterruptedException, StoreException { 848 WorkflowJobBean wfBean = null; 849 Query q = entityManager.createNamedQuery("GET_WORKFLOW"); 850 q.setParameter("id", id); 851 List<WorkflowJobBean> w = q.getResultList(); 852 if (w.size() > 0) { 853 wfBean = w.get(0); 854 return getBeanForRunningWorkflow(wfBean); 855 } 856 return null; 857 } 858 859 private WorkflowJobBean getBeanForRunningWorkflow(WorkflowJobBean w) throws SQLException { 860 WorkflowJobBean wfBean = new WorkflowJobBean(); 861 wfBean.setId(w.getId()); 862 wfBean.setAppName(w.getAppName()); 863 wfBean.setAppPath(w.getAppPath()); 864 wfBean.setConfBlob(w.getConfBlob()); 865 wfBean.setGroup(w.getGroup()); 866 wfBean.setRun(w.getRun()); 867 wfBean.setUser(w.getUser()); 868 wfBean.setCreatedTime(w.getCreatedTime()); 869 wfBean.setEndTime(w.getEndTime()); 870 wfBean.setExternalId(w.getExternalId()); 871 wfBean.setLastModifiedTime(w.getLastModifiedTime()); 872 wfBean.setLogToken(w.getLogToken()); 873 wfBean.setProtoActionConfBlob(w.getProtoActionConfBlob()); 874 wfBean.setSlaXmlBlob(w.getSlaXmlBlob()); 875 wfBean.setStartTime(w.getStartTime()); 876 wfBean.setStatus(w.getStatus()); 877 wfBean.setWfInstanceBlob(w.getWfInstanceBlob()); 878 return wfBean; 879 } 880 881 private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) { 882 883 WorkflowJobBean wfBean = new WorkflowJobBean(); 884 wfBean.setId((String) arr[0]); 885 if (arr[1] != null) { 886 wfBean.setAppName((String) arr[1]); 887 } 888 if (arr[2] != null) { 889 wfBean.setStatus(Status.valueOf((String) arr[2])); 890 } 891 if (arr[3] != null) { 892 wfBean.setRun((Integer) arr[3]); 893 } 894 if (arr[4] != null) { 895 wfBean.setUser((String) arr[4]); 896 } 897 if (arr[5] != null) { 898 wfBean.setGroup((String) arr[5]); 899 } 900 if (arr[6] != null) { 901 wfBean.setCreatedTime((Timestamp) arr[6]); 902 } 903 if (arr[7] != null) { 904 wfBean.setStartTime((Timestamp) arr[7]); 905 } 906 if (arr[8] != null) { 907 wfBean.setLastModifiedTime((Timestamp) arr[8]); 908 } 909 if (arr[9] != null) { 910 wfBean.setEndTime((Timestamp) arr[9]); 911 } 912 return wfBean; 913 } 914 915 private WorkflowActionBean getBeanForRunningAction(WorkflowActionBean a) throws SQLException { 916 if (a != null) { 917 WorkflowActionBean action = new WorkflowActionBean(); 918 action.setId(a.getId()); 919 action.setConfBlob(a.getConfBlob()); 920 action.setConsoleUrl(a.getConsoleUrl()); 921 action.setDataBlob(a.getDataBlob()); 922 action.setStatsBlob(a.getStatsBlob()); 923 action.setExternalChildIDsBlob(a.getExternalChildIDsBlob()); 924 action.setErrorInfo(a.getErrorCode(), a.getErrorMessage()); 925 action.setExternalId(a.getExternalId()); 926 action.setExternalStatus(a.getExternalStatus()); 927 action.setName(a.getName()); 928 action.setCred(a.getCred()); 929 action.setRetries(a.getRetries()); 930 action.setTrackerUri(a.getTrackerUri()); 931 action.setTransition(a.getTransition()); 932 action.setType(a.getType()); 933 action.setEndTime(a.getEndTime()); 934 action.setExecutionPath(a.getExecutionPath()); 935 action.setLastCheckTime(a.getLastCheckTime()); 936 action.setLogToken(a.getLogToken()); 937 if (a.isPending() == true) { 938 action.setPending(); 939 } 940 action.setPendingAge(a.getPendingAge()); 941 action.setSignalValue(a.getSignalValue()); 942 action.setSlaXmlBlob(a.getSlaXmlBlob()); 943 action.setStartTime(a.getStartTime()); 944 action.setStatus(a.getStatus()); 945 action.setJobId(a.getWfId()); 946 action.setUserRetryCount(a.getUserRetryCount()); 947 action.setUserRetryInterval(a.getUserRetryInterval()); 948 action.setUserRetryMax(a.getUserRetryMax()); 949 return action; 950 } 951 return null; 952 } 953}