001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.store; 019 020 import java.sql.Connection; 021 import java.sql.SQLException; 022 import java.sql.Timestamp; 023 import java.util.ArrayList; 024 import java.util.Date; 025 import java.util.List; 026 import java.util.Map; 027 import java.util.concurrent.Callable; 028 029 import javax.persistence.EntityManager; 030 import javax.persistence.Query; 031 032 import org.apache.oozie.ErrorCode; 033 import org.apache.oozie.WorkflowActionBean; 034 import org.apache.oozie.WorkflowJobBean; 035 import org.apache.oozie.WorkflowsInfo; 036 import org.apache.oozie.client.OozieClient; 037 import org.apache.oozie.client.WorkflowJob.Status; 038 import org.apache.oozie.service.InstrumentationService; 039 import org.apache.oozie.service.SchemaService; 040 import org.apache.oozie.service.Services; 041 import org.apache.oozie.service.SchemaService.SchemaName; 042 import org.apache.oozie.util.Instrumentation; 043 import org.apache.oozie.util.ParamChecker; 044 import org.apache.oozie.util.XLog; 045 import org.apache.oozie.workflow.WorkflowException; 046 import org.apache.openjpa.persistence.OpenJPAEntityManager; 047 import org.apache.openjpa.persistence.OpenJPAPersistence; 048 import org.apache.openjpa.persistence.OpenJPAQuery; 049 import org.apache.openjpa.persistence.jdbc.FetchDirection; 050 import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan; 051 import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm; 052 import org.apache.openjpa.persistence.jdbc.ResultSetType; 053 054 /** 055 * DB Implementation of Workflow Store 056 */ 057 public class WorkflowStore extends Store { 058 private Connection conn; 059 private EntityManager entityManager; 060 private boolean selectForUpdate; 061 private static final String INSTR_GROUP = "db"; 062 public static final int LOCK_TIMEOUT = 50000; 063 private static final String seletStr = "Select w.id, w.appName, w.status, w.run, w.user, w.group, w.createdTimestamp, " 064 + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp from WorkflowJobBean w"; 065 private static final String countStr = "Select count(w) from WorkflowJobBean w"; 066 067 public WorkflowStore() { 068 } 069 070 public WorkflowStore(Connection connection, boolean selectForUpdate) throws StoreException { 071 super(); 072 conn = ParamChecker.notNull(connection, "conn"); 073 entityManager = getEntityManager(); 074 this.selectForUpdate = selectForUpdate; 075 } 076 077 public WorkflowStore(Connection connection, Store store, boolean selectForUpdate) throws StoreException { 078 super(store); 079 conn = ParamChecker.notNull(connection, "conn"); 080 entityManager = getEntityManager(); 081 this.selectForUpdate = selectForUpdate; 082 } 083 084 public WorkflowStore(boolean selectForUpdate) throws StoreException { 085 super(); 086 entityManager = getEntityManager(); 087 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.WORKFLOW); 088 OpenJPAEntityManager kem = OpenJPAPersistence.cast(entityManager); 089 conn = (Connection) kem.getConnection(); 090 this.selectForUpdate = selectForUpdate; 091 } 092 093 public WorkflowStore(Store store, boolean selectForUpdate) throws StoreException { 094 super(store); 095 entityManager = getEntityManager(); 096 this.selectForUpdate = selectForUpdate; 097 } 098 099 /** 100 * Create a Workflow and return a WorkflowJobBean. It also creates the process instance for the job. 101 * 102 * @param workflow workflow bean 103 * @throws StoreException 104 */ 105 106 public void insertWorkflow(final WorkflowJobBean workflow) throws StoreException { 107 ParamChecker.notNull(workflow, "workflow"); 108 109 doOperation("insertWorkflow", new Callable<Void>() { 110 public Void call() throws SQLException, StoreException, WorkflowException { 111 entityManager.persist(workflow); 112 return null; 113 } 114 }); 115 } 116 117 /** 118 * Load the Workflow into a Bean and return it. Also load the Workflow Instance into the bean. And lock the Workflow 119 * depending on the locking parameter. 120 * 121 * @param id Workflow ID 122 * @param locking true if Workflow is to be locked 123 * @return WorkflowJobBean 124 * @throws StoreException 125 */ 126 public WorkflowJobBean getWorkflow(final String id, final boolean locking) throws StoreException { 127 ParamChecker.notEmpty(id, "WorkflowID"); 128 WorkflowJobBean wfBean = doOperation("getWorkflow", new Callable<WorkflowJobBean>() { 129 public WorkflowJobBean call() throws SQLException, StoreException, WorkflowException, InterruptedException { 130 WorkflowJobBean wfBean = null; 131 wfBean = getWorkflowOnly(id, locking); 132 if (wfBean == null) { 133 throw new StoreException(ErrorCode.E0604, id); 134 } 135 /* 136 * WorkflowInstance wfInstance; //krishna and next line 137 * wfInstance = workflowLib.get(id); wfInstance = 138 * wfBean.get(wfBean.getWfInstance()); 139 * wfBean.setWorkflowInstance(wfInstance); 140 * wfBean.setWfInstance(wfInstance); 141 */ 142 return wfBean; 143 } 144 }); 145 return wfBean; 146 } 147 148 /** 149 * Get the number of Workflows with the given status. 150 * 151 * @param status Workflow Status. 152 * @return number of Workflows with given status. 153 * @throws StoreException 154 */ 155 public int getWorkflowCountWithStatus(final String status) throws StoreException { 156 ParamChecker.notEmpty(status, "status"); 157 Integer cnt = doOperation("getWorkflowCountWithStatus", new Callable<Integer>() { 158 public Integer call() throws SQLException { 159 Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS"); 160 q.setParameter("status", status); 161 Long count = (Long) q.getSingleResult(); 162 return Integer.valueOf(count.intValue()); 163 } 164 }); 165 return cnt.intValue(); 166 } 167 168 /** 169 * Get the number of Workflows with the given status which was modified in given time limit. 170 * 171 * @param status Workflow Status. 172 * @param secs No. of seconds within which the workflow got modified. 173 * @return number of Workflows modified within given time with given status. 174 * @throws StoreException 175 */ 176 public int getWorkflowCountWithStatusInLastNSeconds(final String status, final int secs) throws StoreException { 177 ParamChecker.notEmpty(status, "status"); 178 ParamChecker.notEmpty(status, "secs"); 179 Integer cnt = doOperation("getWorkflowCountWithStatusInLastNSecs", new Callable<Integer>() { 180 public Integer call() throws SQLException { 181 Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS_IN_LAST_N_SECS"); 182 Timestamp ts = new Timestamp(System.currentTimeMillis() - (secs * 1000)); 183 q.setParameter("status", status); 184 q.setParameter("lastModTime", ts); 185 Long count = (Long) q.getSingleResult(); 186 return Integer.valueOf(count.intValue()); 187 } 188 }); 189 return cnt.intValue(); 190 } 191 192 /** 193 * Update the data from Workflow Bean to DB along with the workflow instance data. Action table is not updated 194 * 195 * @param wfBean Workflow Bean 196 * @throws StoreException If Workflow doesn't exist 197 */ 198 public void updateWorkflow(final WorkflowJobBean wfBean) throws StoreException { 199 ParamChecker.notNull(wfBean, "WorkflowJobBean"); 200 doOperation("updateWorkflow", new Callable<Void>() { 201 public Void call() throws SQLException, StoreException, WorkflowException { 202 Query q = entityManager.createNamedQuery("UPDATE_WORKFLOW"); 203 q.setParameter("id", wfBean.getId()); 204 setWFQueryParameters(wfBean, q); 205 q.executeUpdate(); 206 return null; 207 } 208 }); 209 } 210 211 /** 212 * Create a new Action record in the ACTIONS table with the given Bean. 213 * 214 * @param action WorkflowActionBean 215 * @throws StoreException If the action is already present 216 */ 217 public void insertAction(final WorkflowActionBean action) throws StoreException { 218 ParamChecker.notNull(action, "WorkflowActionBean"); 219 doOperation("insertAction", new Callable<Void>() { 220 public Void call() throws SQLException, StoreException, WorkflowException { 221 entityManager.persist(action); 222 return null; 223 } 224 }); 225 } 226 227 /** 228 * Load the action data and returns a bean. 229 * 230 * @param id Action Id 231 * @param locking true if the action is to be locked 232 * @return Action Bean 233 * @throws StoreException If action doesn't exist 234 */ 235 public WorkflowActionBean getAction(final String id, final boolean locking) throws StoreException { 236 ParamChecker.notEmpty(id, "ActionID"); 237 WorkflowActionBean action = doOperation("getAction", new Callable<WorkflowActionBean>() { 238 public WorkflowActionBean call() throws SQLException, StoreException, WorkflowException, 239 InterruptedException { 240 Query q = entityManager.createNamedQuery("GET_ACTION"); 241 /* 242 * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q); 243 * FetchPlan fetch = oq.getFetchPlan(); 244 * fetch.setReadLockMode(LockModeType.WRITE); 245 * fetch.setLockTimeout(1000); // 1 seconds } 246 */ 247 WorkflowActionBean action = null; 248 q.setParameter("id", id); 249 List<WorkflowActionBean> actions = q.getResultList(); 250 // action = (WorkflowActionBean) q.getSingleResult(); 251 if (actions.size() > 0) { 252 action = actions.get(0); 253 } 254 else { 255 throw new StoreException(ErrorCode.E0605, id); 256 } 257 258 /* 259 * if (locking) return action; else 260 */ 261 // return action; 262 return getBeanForRunningAction(action); 263 } 264 }); 265 return action; 266 } 267 268 /** 269 * Update the given action bean to DB. 270 * 271 * @param action Action Bean 272 * @throws StoreException if action doesn't exist 273 */ 274 public void updateAction(final WorkflowActionBean action) throws StoreException { 275 ParamChecker.notNull(action, "WorkflowActionBean"); 276 doOperation("updateAction", new Callable<Void>() { 277 public Void call() throws SQLException, StoreException, WorkflowException { 278 Query q = entityManager.createNamedQuery("UPDATE_ACTION"); 279 q.setParameter("id", action.getId()); 280 setActionQueryParameters(action, q); 281 q.executeUpdate(); 282 return null; 283 } 284 }); 285 } 286 287 /** 288 * Delete the Action with given id. 289 * 290 * @param id Action ID 291 * @throws StoreException if Action doesn't exist 292 */ 293 public void deleteAction(final String id) throws StoreException { 294 ParamChecker.notEmpty(id, "ActionID"); 295 doOperation("deleteAction", new Callable<Void>() { 296 public Void call() throws SQLException, StoreException, WorkflowException { 297 /* 298 * Query q = entityManager.createNamedQuery("DELETE_ACTION"); 299 * q.setParameter("id", id); q.executeUpdate(); 300 */ 301 WorkflowActionBean action = entityManager.find(WorkflowActionBean.class, id); 302 if (action != null) { 303 entityManager.remove(action); 304 } 305 return null; 306 } 307 }); 308 } 309 310 /** 311 * Loads all the actions for the given Workflow. Also locks all the actions if locking is true. 312 * 313 * @param wfId Workflow ID 314 * @param locking true if Actions are to be locked 315 * @return A List of WorkflowActionBean 316 * @throws StoreException 317 */ 318 public List<WorkflowActionBean> getActionsForWorkflow(final String wfId, final boolean locking) 319 throws StoreException { 320 ParamChecker.notEmpty(wfId, "WorkflowID"); 321 List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow", 322 new Callable<List<WorkflowActionBean>>() { 323 public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException, 324 InterruptedException { 325 List<WorkflowActionBean> actions; 326 List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>(); 327 try { 328 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW"); 329 330 /* 331 * OpenJPAQuery oq = OpenJPAPersistence.cast(q); 332 * if (locking) { // 333 * q.setHint("openjpa.FetchPlan.ReadLockMode" 334 * ,"WRITE"); FetchPlan fetch = oq.getFetchPlan(); 335 * fetch.setReadLockMode(LockModeType.WRITE); 336 * fetch.setLockTimeout(1000); // 1 seconds } 337 */ 338 q.setParameter("wfId", wfId); 339 actions = q.getResultList(); 340 for (WorkflowActionBean a : actions) { 341 WorkflowActionBean aa = getBeanForRunningAction(a); 342 actionList.add(aa); 343 } 344 } 345 catch (IllegalStateException e) { 346 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 347 } 348 /* 349 * if (locking) { return actions; } else { 350 */ 351 return actionList; 352 // } 353 } 354 }); 355 return actions; 356 } 357 358 /** 359 * Loads given number of actions for the given Workflow. Also locks all the actions if locking is true. 360 * 361 * @param wfId Workflow ID 362 * @param start offset for select statement 363 * @param len number of Workflow Actions to be returned 364 * @param locking true if Actions are to be locked 365 * @return A List of WorkflowActionBean 366 * @throws StoreException 367 */ 368 public List<WorkflowActionBean> getActionsSubsetForWorkflow(final String wfId, final int start, final int len) 369 throws StoreException { 370 ParamChecker.notEmpty(wfId, "WorkflowID"); 371 List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow", 372 new Callable<List<WorkflowActionBean>>() { 373 public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException, 374 InterruptedException { 375 List<WorkflowActionBean> actions; 376 List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>(); 377 try { 378 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW"); 379 OpenJPAQuery oq = OpenJPAPersistence.cast(q); 380 q.setParameter("wfId", wfId); 381 q.setFirstResult(start - 1); 382 q.setMaxResults(len); 383 actions = q.getResultList(); 384 for (WorkflowActionBean a : actions) { 385 WorkflowActionBean aa = getBeanForRunningAction(a); 386 actionList.add(aa); 387 } 388 } 389 catch (IllegalStateException e) { 390 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 391 } 392 return actionList; 393 } 394 }); 395 return actions; 396 } 397 398 /** 399 * Load All the actions that are pending for more than given time. 400 * 401 * @param minimumPendingAgeSecs Minimum Pending age in seconds 402 * @return List of action beans 403 * @throws StoreException 404 */ 405 public List<WorkflowActionBean> getPendingActions(final long minimumPendingAgeSecs) throws StoreException { 406 List<WorkflowActionBean> actions = doOperation("getPendingActions", new Callable<List<WorkflowActionBean>>() { 407 public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException { 408 Timestamp ts = new Timestamp(System.currentTimeMillis() - minimumPendingAgeSecs * 1000); 409 List<WorkflowActionBean> actionList = null; 410 try { 411 Query q = entityManager.createNamedQuery("GET_PENDING_ACTIONS"); 412 q.setParameter("pendingAge", ts); 413 actionList = q.getResultList(); 414 } 415 catch (IllegalStateException e) { 416 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 417 } 418 return actionList; 419 } 420 }); 421 return actions; 422 } 423 424 /** 425 * Load All the actions that are running and were last checked after now - miminumCheckAgeSecs 426 * 427 * @param checkAgeSecs check age in seconds. 428 * @return List of action beans. 429 * @throws StoreException 430 */ 431 public List<WorkflowActionBean> getRunningActions(final long checkAgeSecs) throws StoreException { 432 List<WorkflowActionBean> actions = doOperation("getRunningActions", new Callable<List<WorkflowActionBean>>() { 433 434 public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException { 435 List<WorkflowActionBean> actions = new ArrayList<WorkflowActionBean>(); 436 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000); 437 try { 438 Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS"); 439 q.setParameter("lastCheckTime", ts); 440 actions = q.getResultList(); 441 } 442 catch (IllegalStateException e) { 443 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 444 } 445 446 return actions; 447 } 448 }); 449 return actions; 450 } 451 452 /** 453 * Load All the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL. 454 * 455 * @param wfId String 456 * @return List of action beans 457 * @throws StoreException 458 */ 459 public List<WorkflowActionBean> getRetryAndManualActions(final String wfId) throws StoreException { 460 List<WorkflowActionBean> actions = doOperation("GET_RETRY_MANUAL_ACTIONS", 461 new Callable<List<WorkflowActionBean>>() { 462 public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException { 463 List<WorkflowActionBean> actionList = null; 464 try { 465 Query q = entityManager.createNamedQuery("GET_RETRY_MANUAL_ACTIONS"); 466 q.setParameter("wfId", wfId); 467 actionList = q.getResultList(); 468 } 469 catch (IllegalStateException e) { 470 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 471 } 472 473 return actionList; 474 } 475 }); 476 return actions; 477 } 478 479 /** 480 * Loads all the jobs that are satisfying the given filter condition. Filters can be applied on user, group, 481 * appName, status. 482 * 483 * @param filter Filter condition 484 * @param start offset for select statement 485 * @param len number of Workflows to be returned 486 * @return A list of workflows 487 * @throws StoreException 488 */ 489 public WorkflowsInfo getWorkflowsInfo(final Map<String, List<String>> filter, final int start, final int len) 490 throws StoreException { 491 492 WorkflowsInfo workFlowsInfo = doOperation("getWorkflowsInfo", new Callable<WorkflowsInfo>() { 493 @SuppressWarnings("unchecked") 494 public WorkflowsInfo call() throws SQLException, StoreException { 495 496 List<String> orArray = new ArrayList<String>(); 497 List<String> colArray = new ArrayList<String>(); 498 List<String> valArray = new ArrayList<String>(); 499 StringBuilder sb = new StringBuilder(""); 500 boolean isStatus = false; 501 boolean isGroup = false; 502 boolean isAppName = false; 503 boolean isUser = false; 504 boolean isEnabled = false; 505 int index = 0; 506 for (Map.Entry<String, List<String>> entry : filter.entrySet()) { 507 String colName = null; 508 String colVar = null; 509 if (entry.getKey().equals(OozieClient.FILTER_GROUP)) { 510 List<String> values = filter.get(OozieClient.FILTER_GROUP); 511 colName = "group"; 512 for (int i = 0; i < values.size(); i++) { 513 colVar = "group"; 514 colVar = colVar + index; 515 if (!isEnabled && !isGroup) { 516 sb.append(seletStr).append(" where w.group IN (:group" + index); 517 isGroup = true; 518 isEnabled = true; 519 } 520 else { 521 if (isEnabled && !isGroup) { 522 sb.append(" and w.group IN (:group" + index); 523 isGroup = true; 524 } 525 else { 526 if (isGroup) { 527 sb.append(", :group" + index); 528 } 529 } 530 } 531 if (i == values.size() - 1) { 532 sb.append(")"); 533 } 534 index++; 535 valArray.add(values.get(i)); 536 orArray.add(colName); 537 colArray.add(colVar); 538 } 539 } 540 else { 541 if (entry.getKey().equals(OozieClient.FILTER_STATUS)) { 542 List<String> values = filter.get(OozieClient.FILTER_STATUS); 543 colName = "status"; 544 for (int i = 0; i < values.size(); i++) { 545 colVar = "status"; 546 colVar = colVar + index; 547 if (!isEnabled && !isStatus) { 548 sb.append(seletStr).append(" where w.status IN (:status" + index); 549 isStatus = true; 550 isEnabled = true; 551 } 552 else { 553 if (isEnabled && !isStatus) { 554 sb.append(" and w.status IN (:status" + index); 555 isStatus = true; 556 } 557 else { 558 if (isStatus) { 559 sb.append(", :status" + index); 560 } 561 } 562 } 563 if (i == values.size() - 1) { 564 sb.append(")"); 565 } 566 index++; 567 valArray.add(values.get(i)); 568 orArray.add(colName); 569 colArray.add(colVar); 570 } 571 } 572 else { 573 if (entry.getKey().equals(OozieClient.FILTER_NAME)) { 574 List<String> values = filter.get(OozieClient.FILTER_NAME); 575 colName = "appName"; 576 for (int i = 0; i < values.size(); i++) { 577 colVar = "appName"; 578 colVar = colVar + index; 579 if (!isEnabled && !isAppName) { 580 sb.append(seletStr).append(" where w.appName IN (:appName" + index); 581 isAppName = true; 582 isEnabled = true; 583 } 584 else { 585 if (isEnabled && !isAppName) { 586 sb.append(" and w.appName IN (:appName" + index); 587 isAppName = true; 588 } 589 else { 590 if (isAppName) { 591 sb.append(", :appName" + index); 592 } 593 } 594 } 595 if (i == values.size() - 1) { 596 sb.append(")"); 597 } 598 index++; 599 valArray.add(values.get(i)); 600 orArray.add(colName); 601 colArray.add(colVar); 602 } 603 } 604 else { 605 if (entry.getKey().equals(OozieClient.FILTER_USER)) { 606 List<String> values = filter.get(OozieClient.FILTER_USER); 607 colName = "user"; 608 for (int i = 0; i < values.size(); i++) { 609 colVar = "user"; 610 colVar = colVar + index; 611 if (!isEnabled && !isUser) { 612 sb.append(seletStr).append(" where w.user IN (:user" + index); 613 isUser = true; 614 isEnabled = true; 615 } 616 else { 617 if (isEnabled && !isUser) { 618 sb.append(" and w.user IN (:user" + index); 619 isUser = true; 620 } 621 else { 622 if (isUser) { 623 sb.append(", :user" + index); 624 } 625 } 626 } 627 if (i == values.size() - 1) { 628 sb.append(")"); 629 } 630 index++; 631 valArray.add(values.get(i)); 632 orArray.add(colName); 633 colArray.add(colVar); 634 } 635 } 636 } 637 } 638 } 639 } 640 641 int realLen = 0; 642 643 Query q = null; 644 Query qTotal = null; 645 if (orArray.size() == 0) { 646 q = entityManager.createNamedQuery("GET_WORKFLOWS_COLUMNS"); 647 q.setFirstResult(start - 1); 648 q.setMaxResults(len); 649 qTotal = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT"); 650 } 651 else { 652 if (orArray.size() > 0) { 653 StringBuilder sbTotal = new StringBuilder(sb); 654 sb.append(" order by w.startTimestamp desc "); 655 XLog.getLog(getClass()).debug("Created String is **** " + sb.toString()); 656 q = entityManager.createQuery(sb.toString()); 657 q.setFirstResult(start - 1); 658 q.setMaxResults(len); 659 qTotal = entityManager.createQuery(sbTotal.toString().replace(seletStr, countStr)); 660 for (int i = 0; i < orArray.size(); i++) { 661 q.setParameter(colArray.get(i), valArray.get(i)); 662 qTotal.setParameter(colArray.get(i), valArray.get(i)); 663 } 664 } 665 } 666 667 OpenJPAQuery kq = OpenJPAPersistence.cast(q); 668 JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan(); 669 fetch.setFetchBatchSize(20); 670 fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE); 671 fetch.setFetchDirection(FetchDirection.FORWARD); 672 fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST); 673 List<?> resultList = q.getResultList(); 674 List<Object[]> objectArrList = (List<Object[]>) resultList; 675 List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>(); 676 677 for (Object[] arr : objectArrList) { 678 WorkflowJobBean ww = getBeanForWorkflowFromArray(arr); 679 wfBeansList.add(ww); 680 } 681 682 realLen = ((Long) qTotal.getSingleResult()).intValue(); 683 684 return new WorkflowsInfo(wfBeansList, start, len, realLen); 685 } 686 }); 687 return workFlowsInfo; 688 689 } 690 691 /** 692 * Load the Workflow and all Action details and return a WorkflowJobBean. Workflow Instance is not loaded 693 * 694 * @param id Workflow Id 695 * @return Workflow Bean 696 * @throws StoreException If Workflow doesn't exist 697 */ 698 public WorkflowJobBean getWorkflowInfo(final String id) throws StoreException { 699 ParamChecker.notEmpty(id, "WorkflowID"); 700 WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() { 701 public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException { 702 WorkflowJobBean wfBean = null; 703 wfBean = getWorkflowforInfo(id, false); 704 if (wfBean == null) { 705 throw new StoreException(ErrorCode.E0604, id); 706 } 707 else { 708 wfBean.setActions(getActionsForWorkflow(id, false)); 709 } 710 return wfBean; 711 } 712 }); 713 return wfBean; 714 } 715 716 /** 717 * Load the Workflow and subset Actions details and return a WorkflowJobBean. Workflow Instance is not loaded 718 * 719 * @param id Workflow Id 720 * @param start offset for select statement for actions 721 * @param len number of Workflow Actions to be returned 722 * @return Workflow Bean 723 * @throws StoreException If Workflow doesn't exist 724 */ 725 public WorkflowJobBean getWorkflowInfoWithActionsSubset(final String id, final int start, final int len) throws StoreException { 726 ParamChecker.notEmpty(id, "WorkflowID"); 727 WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() { 728 public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException { 729 WorkflowJobBean wfBean = null; 730 wfBean = getWorkflowforInfo(id, false); 731 if (wfBean == null) { 732 throw new StoreException(ErrorCode.E0604, id); 733 } 734 else { 735 wfBean.setActions(getActionsSubsetForWorkflow(id, start, len)); 736 } 737 return wfBean; 738 } 739 }); 740 return wfBean; 741 } 742 743 /** 744 * Get the Workflow ID with given external ID which will be assigned for the subworkflows. 745 * 746 * @param externalId external ID 747 * @return Workflow ID 748 * @throws StoreException if there is no job with external ID 749 */ 750 public String getWorkflowIdForExternalId(final String externalId) throws StoreException { 751 ParamChecker.notEmpty(externalId, "externalId"); 752 String wfId = doOperation("getWorkflowIdForExternalId", new Callable<String>() { 753 public String call() throws SQLException, StoreException { 754 String id = ""; 755 Query q = entityManager.createNamedQuery("GET_WORKFLOW_ID_FOR_EXTERNAL_ID"); 756 q.setParameter("externalId", externalId); 757 List<String> w = q.getResultList(); 758 if (w.size() == 0) { 759 id = ""; 760 } 761 else { 762 int index = w.size() - 1; 763 id = w.get(index); 764 } 765 return id; 766 } 767 }); 768 return wfId; 769 } 770 771 private static final long DAY_IN_MS = 24 * 60 * 60 * 1000; 772 773 /** 774 * Purge the Workflows Completed older than given days. 775 * 776 * @param olderThanDays number of days for which to preserve the workflows 777 * @throws StoreException 778 */ 779 public void purge(final long olderThanDays, final int limit) throws StoreException { 780 doOperation("purge", new Callable<Void>() { 781 public Void call() throws SQLException, StoreException, WorkflowException { 782 Timestamp maxEndTime = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS)); 783 Query q = entityManager.createNamedQuery("GET_COMPLETED_WORKFLOWS_OLDER_THAN"); 784 q.setParameter("endTime", maxEndTime); 785 q.setMaxResults(limit); 786 List<WorkflowJobBean> workflows = q.getResultList(); 787 int actionDeleted = 0; 788 if (workflows.size() != 0) { 789 for (WorkflowJobBean w : workflows) { 790 String wfId = w.getId(); 791 entityManager.remove(w); 792 Query g = entityManager.createNamedQuery("DELETE_ACTIONS_FOR_WORKFLOW"); 793 g.setParameter("wfId", wfId); 794 actionDeleted += g.executeUpdate(); 795 } 796 } 797 XLog.getLog(getClass()).debug("ENDED Workflow Purge deleted jobs :" + workflows.size() + " and actions " + actionDeleted); 798 return null; 799 } 800 }); 801 } 802 803 private <V> V doOperation(String name, Callable<V> command) throws StoreException { 804 try { 805 Instrumentation.Cron cron = new Instrumentation.Cron(); 806 cron.start(); 807 V retVal; 808 try { 809 retVal = command.call(); 810 } 811 finally { 812 cron.stop(); 813 } 814 Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron); 815 return retVal; 816 } 817 catch (StoreException ex) { 818 throw ex; 819 } 820 catch (SQLException ex) { 821 throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex); 822 } 823 catch (Exception e) { 824 throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e); 825 } 826 } 827 828 private WorkflowJobBean getWorkflowOnly(final String id, boolean locking) throws SQLException, 829 InterruptedException, StoreException { 830 WorkflowJobBean wfBean = null; 831 Query q = entityManager.createNamedQuery("GET_WORKFLOW"); 832 /* 833 * if (locking) { // q.setHint("openjpa.FetchPlan.ReadLockMode","READ"); 834 * OpenJPAQuery oq = OpenJPAPersistence.cast(q); FetchPlan fetch = 835 * oq.getFetchPlan(); fetch.setReadLockMode(LockModeType.WRITE); 836 * fetch.setLockTimeout(-1); // unlimited } 837 */ 838 q.setParameter("id", id); 839 List<WorkflowJobBean> w = q.getResultList(); 840 if (w.size() > 0) { 841 wfBean = w.get(0); 842 } 843 return wfBean; 844 // return getBeanForRunningWorkflow(wfBean); 845 } 846 847 private WorkflowJobBean getWorkflowforInfo(final String id, boolean locking) throws SQLException, 848 InterruptedException, StoreException { 849 WorkflowJobBean wfBean = null; 850 Query q = entityManager.createNamedQuery("GET_WORKFLOW"); 851 q.setParameter("id", id); 852 List<WorkflowJobBean> w = q.getResultList(); 853 if (w.size() > 0) { 854 wfBean = w.get(0); 855 return getBeanForRunningWorkflow(wfBean); 856 } 857 return null; 858 } 859 860 private WorkflowJobBean getBeanForRunningWorkflow(WorkflowJobBean w) throws SQLException { 861 WorkflowJobBean wfBean = new WorkflowJobBean(); 862 wfBean.setId(w.getId()); 863 wfBean.setAppName(w.getAppName()); 864 wfBean.setAppPath(w.getAppPath()); 865 wfBean.setConf(w.getConf()); 866 wfBean.setGroup(w.getGroup()); 867 wfBean.setRun(w.getRun()); 868 wfBean.setUser(w.getUser()); 869 wfBean.setAuthToken(w.getAuthToken()); 870 wfBean.setCreatedTime(w.getCreatedTime()); 871 wfBean.setEndTime(w.getEndTime()); 872 wfBean.setExternalId(w.getExternalId()); 873 wfBean.setLastModifiedTime(w.getLastModifiedTime()); 874 wfBean.setLogToken(w.getLogToken()); 875 wfBean.setProtoActionConf(w.getProtoActionConf()); 876 wfBean.setSlaXml(w.getSlaXml()); 877 wfBean.setStartTime(w.getStartTime()); 878 wfBean.setStatus(w.getStatus()); 879 wfBean.setWfInstance(w.getWfInstance()); 880 return wfBean; 881 } 882 883 private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) { 884 885 WorkflowJobBean wfBean = new WorkflowJobBean(); 886 wfBean.setId((String) arr[0]); 887 if (arr[1] != null) { 888 wfBean.setAppName((String) arr[1]); 889 } 890 if (arr[2] != null) { 891 wfBean.setStatus(Status.valueOf((String) arr[2])); 892 } 893 if (arr[3] != null) { 894 wfBean.setRun((Integer) arr[3]); 895 } 896 if (arr[4] != null) { 897 wfBean.setUser((String) arr[4]); 898 } 899 if (arr[5] != null) { 900 wfBean.setGroup((String) arr[5]); 901 } 902 if (arr[6] != null) { 903 wfBean.setCreatedTime((Timestamp) arr[6]); 904 } 905 if (arr[7] != null) { 906 wfBean.setStartTime((Timestamp) arr[7]); 907 } 908 if (arr[8] != null) { 909 wfBean.setLastModifiedTime((Timestamp) arr[8]); 910 } 911 if (arr[9] != null) { 912 wfBean.setEndTime((Timestamp) arr[9]); 913 } 914 return wfBean; 915 } 916 917 private WorkflowActionBean getBeanForRunningAction(WorkflowActionBean a) throws SQLException { 918 if (a != null) { 919 WorkflowActionBean action = new WorkflowActionBean(); 920 action.setId(a.getId()); 921 action.setConf(a.getConf()); 922 action.setConsoleUrl(a.getConsoleUrl()); 923 action.setData(a.getData()); 924 action.setStats(a.getStats()); 925 action.setExternalChildIDs(a.getExternalChildIDs()); 926 action.setErrorInfo(a.getErrorCode(), a.getErrorMessage()); 927 action.setExternalId(a.getExternalId()); 928 action.setExternalStatus(a.getExternalStatus()); 929 action.setName(a.getName()); 930 action.setCred(a.getCred()); 931 action.setRetries(a.getRetries()); 932 action.setTrackerUri(a.getTrackerUri()); 933 action.setTransition(a.getTransition()); 934 action.setType(a.getType()); 935 action.setEndTime(a.getEndTime()); 936 action.setExecutionPath(a.getExecutionPath()); 937 action.setLastCheckTime(a.getLastCheckTime()); 938 action.setLogToken(a.getLogToken()); 939 if (a.getPending() == true) { 940 action.setPending(); 941 } 942 action.setPendingAge(a.getPendingAge()); 943 action.setSignalValue(a.getSignalValue()); 944 action.setSlaXml(a.getSlaXml()); 945 action.setStartTime(a.getStartTime()); 946 action.setStatus(a.getStatus()); 947 action.setJobId(a.getWfId()); 948 action.setUserRetryCount(a.getUserRetryCount()); 949 action.setUserRetryInterval(a.getUserRetryInterval()); 950 action.setUserRetryMax(a.getUserRetryMax()); 951 return action; 952 } 953 return null; 954 } 955 956 private void setWFQueryParameters(WorkflowJobBean wfBean, Query q) { 957 q.setParameter("appName", wfBean.getAppName()); 958 q.setParameter("appPath", wfBean.getAppPath()); 959 q.setParameter("conf", wfBean.getConf()); 960 q.setParameter("groupName", wfBean.getGroup()); 961 q.setParameter("run", wfBean.getRun()); 962 q.setParameter("user", wfBean.getUser()); 963 q.setParameter("authToken", wfBean.getAuthToken()); 964 q.setParameter("createdTime", wfBean.getCreatedTimestamp()); 965 q.setParameter("endTime", wfBean.getEndTimestamp()); 966 q.setParameter("externalId", wfBean.getExternalId()); 967 q.setParameter("lastModTime", new Date()); 968 q.setParameter("logToken", wfBean.getLogToken()); 969 q.setParameter("protoActionConf", wfBean.getProtoActionConf()); 970 q.setParameter("slaXml", wfBean.getSlaXml()); 971 q.setParameter("startTime", wfBean.getStartTimestamp()); 972 q.setParameter("status", wfBean.getStatusStr()); 973 q.setParameter("wfInstance", wfBean.getWfInstance()); 974 } 975 976 private void setActionQueryParameters(WorkflowActionBean aBean, Query q) { 977 q.setParameter("conf", aBean.getConf()); 978 q.setParameter("consoleUrl", aBean.getConsoleUrl()); 979 q.setParameter("data", aBean.getData()); 980 q.setParameter("stats", aBean.getStats()); 981 q.setParameter("externalChildIDs", aBean.getExternalChildIDs()); 982 q.setParameter("errorCode", aBean.getErrorCode()); 983 q.setParameter("errorMessage", aBean.getErrorMessage()); 984 q.setParameter("externalId", aBean.getExternalId()); 985 q.setParameter("externalStatus", aBean.getExternalStatus()); 986 q.setParameter("name", aBean.getName()); 987 q.setParameter("cred", aBean.getCred()); 988 q.setParameter("retries", aBean.getRetries()); 989 q.setParameter("trackerUri", aBean.getTrackerUri()); 990 q.setParameter("transition", aBean.getTransition()); 991 q.setParameter("type", aBean.getType()); 992 q.setParameter("endTime", aBean.getEndTimestamp()); 993 q.setParameter("executionPath", aBean.getExecutionPath()); 994 q.setParameter("lastCheckTime", aBean.getLastCheckTimestamp()); 995 q.setParameter("logToken", aBean.getLogToken()); 996 q.setParameter("pending", aBean.isPending() ? 1 : 0); 997 q.setParameter("pendingAge", aBean.getPendingAgeTimestamp()); 998 q.setParameter("signalValue", aBean.getSignalValue()); 999 q.setParameter("slaXml", aBean.getSlaXml()); 1000 q.setParameter("startTime", aBean.getStartTimestamp()); 1001 q.setParameter("status", aBean.getStatusStr()); 1002 q.setParameter("wfId", aBean.getWfId()); 1003 } 1004 }