001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.store; 019 020import java.sql.SQLException; 021import java.sql.Timestamp; 022import java.util.ArrayList; 023import java.util.Date; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.Callable; 027 028import javax.persistence.EntityManager; 029import javax.persistence.Query; 030 031import org.apache.oozie.CoordinatorActionBean; 032import org.apache.oozie.CoordinatorJobBean; 033import org.apache.oozie.CoordinatorJobInfo; 034import org.apache.oozie.ErrorCode; 035import org.apache.oozie.client.Job.Status; 036import org.apache.oozie.client.CoordinatorJob.Timeunit; 037import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 038import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 039import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 040import org.apache.oozie.executor.jpa.JPAExecutorException; 041import org.apache.oozie.service.InstrumentationService; 042import org.apache.oozie.service.Services; 043import org.apache.oozie.util.DateUtils; 044import org.apache.oozie.util.Instrumentation; 045import org.apache.oozie.util.ParamChecker; 046import org.apache.oozie.util.XLog; 047import org.apache.oozie.workflow.WorkflowException; 048import org.apache.openjpa.persistence.OpenJPAPersistence; 049import org.apache.openjpa.persistence.OpenJPAQuery; 050import org.apache.openjpa.persistence.jdbc.FetchDirection; 051import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan; 052import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm; 053import org.apache.openjpa.persistence.jdbc.ResultSetType; 054 055/** 056 * DB Implementation of Coord Store 057 */ 058public class CoordinatorStore extends Store { 059 private final XLog log = XLog.getLog(getClass()); 060 061 private EntityManager entityManager; 062 private static final String INSTR_GROUP = "db"; 063 public static final int LOCK_TIMEOUT = 50000; 064 private static final long DAY_IN_MS = 24 * 60 * 60 * 1000; 065 066 public CoordinatorStore(boolean selectForUpdate) throws StoreException { 067 super(); 068 entityManager = getEntityManager(); 069 } 070 071 public CoordinatorStore(Store store, boolean selectForUpdate) throws StoreException { 072 super(store); 073 entityManager = getEntityManager(); 074 } 075 076 /** 077 * Create a CoordJobBean. It also creates the process instance for the job. 078 * 079 * @param workflow workflow bean 080 * @throws StoreException 081 */ 082 083 public void insertCoordinatorJob(final CoordinatorJobBean coordinatorJob) throws StoreException { 084 ParamChecker.notNull(coordinatorJob, "coordinatorJob"); 085 086 doOperation("insertCoordinatorJob", new Callable<Void>() { 087 public Void call() throws StoreException { 088 entityManager.persist(coordinatorJob); 089 return null; 090 } 091 }); 092 } 093 094 /** 095 * Load the CoordinatorJob into a Bean and return it. Also load the Workflow Instance into the bean. And lock the 096 * Workflow depending on the locking parameter. 097 * 098 * @param id Job ID 099 * @param locking Flag for Table Lock 100 * @return CoordinatorJobBean 101 * @throws StoreException 102 */ 103 public CoordinatorJobBean getCoordinatorJob(final String id, final boolean locking) throws StoreException { 104 ParamChecker.notEmpty(id, "CoordJobId"); 105 CoordinatorJobBean cjBean = doOperation("getCoordinatorJob", new Callable<CoordinatorJobBean>() { 106 @SuppressWarnings("unchecked") 107 public CoordinatorJobBean call() throws StoreException { 108 Query q = entityManager.createNamedQuery("GET_COORD_JOB"); 109 q.setParameter("id", id); 110 /* 111 * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q); 112 * // q.setHint("openjpa.FetchPlan.ReadLockMode","WRITE"); 113 * FetchPlan fetch = oq.getFetchPlan(); 114 * fetch.setReadLockMode(LockModeType.WRITE); 115 * fetch.setLockTimeout(-1); // 1 second } 116 */ 117 List<CoordinatorJobBean> cjBeans = q.getResultList(); 118 119 if (cjBeans.size() > 0) { 120 return cjBeans.get(0); 121 } 122 else { 123 throw new StoreException(ErrorCode.E0604, id); 124 } 125 } 126 }); 127 128 cjBean.setStatus(cjBean.getStatus()); 129 return cjBean; 130 } 131 132 /** 133 * Get a list of Coordinator Jobs that should be materialized. Jobs with a 'last materialized time' older than the 134 * argument will be returned. 135 * 136 * @param d Date 137 * @return List of Coordinator Jobs that have a last materialized time older than input date 138 * @throws StoreException 139 */ 140 public List<CoordinatorJobBean> getCoordinatorJobsToBeMaterialized(final Date d, final int limit) 141 throws StoreException { 142 143 ParamChecker.notNull(d, "Coord Job Materialization Date"); 144 List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsToBeMaterialized", 145 new Callable<List<CoordinatorJobBean>>() { 146 public List<CoordinatorJobBean> call() throws StoreException { 147 148 List<CoordinatorJobBean> cjBeans; 149 List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>(); 150 try { 151 Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN"); 152 q.setParameter("matTime", new Timestamp(d.getTime())); 153 if (limit > 0) { 154 q.setMaxResults(limit); 155 } 156 /* 157 OpenJPAQuery oq = OpenJPAPersistence.cast(q); 158 FetchPlan fetch = oq.getFetchPlan(); 159 fetch.setReadLockMode(LockModeType.WRITE); 160 fetch.setLockTimeout(-1); // no limit 161 */ 162 cjBeans = q.getResultList(); 163 // copy results to a new object 164 for (CoordinatorJobBean j : cjBeans) { 165 jobList.add(j); 166 } 167 } 168 catch (IllegalStateException e) { 169 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 170 } 171 return jobList; 172 173 } 174 }); 175 return cjBeans; 176 } 177 178 /** 179 * A list of Coordinator Jobs that are matched with the status and have last materialized time' older than 180 * checkAgeSecs will be returned. 181 * 182 * @param checkAgeSecs Job age in Seconds 183 * @param status Coordinator Job Status 184 * @param limit Number of results to return 185 * @param locking Flag for Table Lock 186 * @return List of Coordinator Jobs that are matched with the parameters. 187 * @throws StoreException 188 */ 189 public List<CoordinatorJobBean> getCoordinatorJobsOlderThanStatus(final long checkAgeSecs, final String status, 190 final int limit, final boolean locking) throws StoreException { 191 192 ParamChecker.notNull(status, "Coord Job Status"); 193 List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsOlderThanStatus", 194 new Callable<List<CoordinatorJobBean>>() { 195 public List<CoordinatorJobBean> call() throws StoreException { 196 197 List<CoordinatorJobBean> cjBeans; 198 List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>(); 199 try { 200 Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN_STATUS"); 201 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000); 202 q.setParameter("lastModTime", ts); 203 q.setParameter("status", status); 204 if (limit > 0) { 205 q.setMaxResults(limit); 206 } 207 /* 208 * if (locking) { OpenJPAQuery oq = 209 * OpenJPAPersistence.cast(q); FetchPlan fetch = 210 * oq.getFetchPlan(); 211 * fetch.setReadLockMode(LockModeType.WRITE); 212 * fetch.setLockTimeout(-1); // no limit } 213 */ 214 cjBeans = q.getResultList(); 215 for (CoordinatorJobBean j : cjBeans) { 216 jobList.add(j); 217 } 218 } 219 catch (Exception e) { 220 throw new StoreException(ErrorCode.E0603, e.getMessage(), e); 221 } 222 return jobList; 223 224 } 225 }); 226 return cjBeans; 227 } 228 229 /** 230 * Load the CoordinatorAction into a Bean and return it. 231 * 232 * @param id action ID 233 * @return CoordinatorActionBean 234 * @throws StoreException 235 */ 236 public CoordinatorActionBean getCoordinatorAction(final String id, final boolean locking) throws StoreException { 237 ParamChecker.notEmpty(id, "actionID"); 238 CoordinatorActionBean caBean = doOperation("getCoordinatorAction", new Callable<CoordinatorActionBean>() { 239 public CoordinatorActionBean call() throws StoreException { 240 Query q = entityManager.createNamedQuery("GET_COORD_ACTION"); 241 q.setParameter("id", id); 242 OpenJPAQuery oq = OpenJPAPersistence.cast(q); 243 /* 244 * if (locking) { //q.setHint("openjpa.FetchPlan.ReadLockMode", 245 * "WRITE"); FetchPlan fetch = oq.getFetchPlan(); 246 * fetch.setReadLockMode(LockModeType.WRITE); 247 * fetch.setLockTimeout(-1); // no limit } 248 */ 249 250 CoordinatorActionBean action = null; 251 List<CoordinatorActionBean> actions = q.getResultList(); 252 if (actions.size() > 0) { 253 action = actions.get(0); 254 } 255 else { 256 throw new StoreException(ErrorCode.E0605, id); 257 } 258 259 /* 260 * if (locking) return action; else 261 */ 262 return getBeanForRunningCoordAction(action); 263 } 264 }); 265 return caBean; 266 } 267 268 /** 269 * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <= 270 * concurrency number. Sort returned actions based on execution order (FIFO, LIFO, LAST_ONLY, NONE) 271 * 272 * @param id job ID 273 * @param numResults number of results to return 274 * @param executionOrder execution for this job - FIFO, LIFO, LAST_ONLY, NONE 275 * @return List of CoordinatorActionBean 276 * @throws StoreException 277 */ 278 public List<CoordinatorActionBean> getCoordinatorActionsForJob(final String id, final int numResults, 279 final String executionOrder) throws StoreException { 280 ParamChecker.notEmpty(id, "jobID"); 281 List<CoordinatorActionBean> caBeans = doOperation("getCoordinatorActionsForJob", 282 new Callable<List<CoordinatorActionBean>>() { 283 public List<CoordinatorActionBean> call() throws StoreException { 284 285 List<CoordinatorActionBean> caBeans; 286 Query q; 287 // check if executionOrder is FIFO, LIFO, NONE or LAST_ONLY 288 if (executionOrder.equalsIgnoreCase("FIFO")) { 289 q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_FIFO"); 290 } 291 else { 292 q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_LIFO"); 293 } 294 q.setParameter("jobId", id); 295 // if executionOrder is LAST_ONLY, only retrieve first 296 // record in LIFO, 297 // otherwise, use numResults if it is positive. 298 if (executionOrder.equalsIgnoreCase("LAST_ONLY")) { 299 q.setMaxResults(1); 300 } 301 else { 302 if (numResults > 0) { 303 q.setMaxResults(numResults); 304 } 305 } 306 caBeans = q.getResultList(); 307 return caBeans; 308 } 309 }); 310 return caBeans; 311 } 312 313 /** 314 * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <= 315 * concurrency number. 316 * 317 * @param id job ID 318 * @return Number of running actions 319 * @throws StoreException 320 */ 321 public int getCoordinatorRunningActionsCount(final String id) throws StoreException { 322 ParamChecker.notEmpty(id, "jobID"); 323 Integer cnt = doOperation("getCoordinatorRunningActionsCount", new Callable<Integer>() { 324 public Integer call() throws SQLException { 325 326 Query q = entityManager.createNamedQuery("GET_COORD_RUNNING_ACTIONS_COUNT"); 327 328 q.setParameter("jobId", id); 329 Long count = (Long) q.getSingleResult(); 330 return Integer.valueOf(count.intValue()); 331 } 332 }); 333 return cnt.intValue(); 334 } 335 336 /** 337 * Create a new Action record in the ACTIONS table with the given Bean. 338 * 339 * @param action WorkflowActionBean 340 * @throws StoreException If the action is already present 341 */ 342 public void insertCoordinatorAction(final CoordinatorActionBean action) throws StoreException { 343 ParamChecker.notNull(action, "CoordinatorActionBean"); 344 doOperation("insertCoordinatorAction", new Callable<Void>() { 345 public Void call() throws StoreException { 346 entityManager.persist(action); 347 return null; 348 } 349 }); 350 } 351 352 /** 353 * Update the given action bean to DB. 354 * 355 * @param action Action Bean 356 * @throws StoreException if action doesn't exist 357 */ 358 public void updateCoordinatorAction(final CoordinatorActionBean action) throws StoreException, JPAExecutorException { 359 ParamChecker.notNull(action, "CoordinatorActionBean"); 360 doOperation("updateCoordinatorAction", new Callable<Void>() { 361 public Void call() throws StoreException, JPAExecutorException { 362 CoordActionQueryExecutor.getInstance().executeUpdate( 363 CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION, action); 364 return null; 365 } 366 }); 367 } 368 369 /** 370 * Update the given action bean to DB. 371 * 372 * @param action Action Bean 373 * @throws StoreException if action doesn't exist 374 */ 375 public void updateCoordActionMin(final CoordinatorActionBean action) throws StoreException, JPAExecutorException { 376 ParamChecker.notNull(action, "CoordinatorActionBean"); 377 doOperation("updateCoordinatorAction", new Callable<Void>() { 378 public Void call() throws StoreException, JPAExecutorException { 379 CoordActionQueryExecutor.getInstance().executeUpdate( 380 CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK, action); 381 return null; 382 } 383 }); 384 } 385 386 /** 387 * Update the given coordinator job bean to DB. 388 * 389 * @param jobbean Coordinator Job Bean 390 * @throws StoreException if action doesn't exist 391 */ 392 public void updateCoordinatorJob(final CoordinatorJobBean job) throws StoreException { 393 ParamChecker.notNull(job, "CoordinatorJobBean"); 394 doOperation("updateJob", new Callable<Void>() { 395 public Void call() throws StoreException, JPAExecutorException { 396 CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job); 397 return null; 398 } 399 }); 400 } 401 402 public void updateCoordinatorJobStatus(final CoordinatorJobBean job) throws StoreException { 403 ParamChecker.notNull(job, "CoordinatorJobBean"); 404 doOperation("updateJobStatus", new Callable<Void>() { 405 public Void call() throws StoreException, JPAExecutorException { 406 CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_MODTIME, job); 407 return null; 408 } 409 }); 410 } 411 412 private <V> V doOperation(String name, Callable<V> command) throws StoreException { 413 try { 414 Instrumentation.Cron cron = new Instrumentation.Cron(); 415 cron.start(); 416 V retVal; 417 try { 418 retVal = command.call(); 419 } 420 finally { 421 cron.stop(); 422 } 423 Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron); 424 return retVal; 425 } 426 catch (StoreException ex) { 427 throw ex; 428 } 429 catch (SQLException ex) { 430 throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex); 431 } 432 catch (Exception e) { 433 throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e); 434 } 435 } 436 437 /** 438 * Purge the coordinators completed older than given days. 439 * 440 * @param olderThanDays number of days for which to preserve the coordinators 441 * @param limit maximum number of coordinator jobs to be purged 442 * @throws StoreException 443 */ 444 public void purge(final long olderThanDays, final int limit) throws StoreException { 445 doOperation("coord-purge", new Callable<Void>() { 446 public Void call() throws SQLException, StoreException, WorkflowException { 447 Timestamp lastModTm = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS)); 448 Query jobQ = entityManager.createNamedQuery("GET_COMPLETED_COORD_JOBS_OLDER_THAN_STATUS"); 449 jobQ.setParameter("lastModTime", lastModTm); 450 jobQ.setMaxResults(limit); 451 List<CoordinatorJobBean> coordJobs = jobQ.getResultList(); 452 453 int actionDeleted = 0; 454 if (coordJobs.size() != 0) { 455 for (CoordinatorJobBean coord : coordJobs) { 456 String jobId = coord.getId(); 457 entityManager.remove(coord); 458 Query g = entityManager.createNamedQuery("DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR"); 459 g.setParameter("jobId", jobId); 460 actionDeleted += g.executeUpdate(); 461 } 462 } 463 464 XLog.getLog(getClass()).debug("ENDED Coord Purge deleted jobs :" + coordJobs.size() + " and actions " + actionDeleted); 465 return null; 466 } 467 }); 468 } 469 470 public void commit() throws StoreException { 471 } 472 473 public void close() throws StoreException { 474 } 475 476 public CoordinatorJobBean getCoordinatorJobs(String id) { 477 // TODO Auto-generated method stub 478 return null; 479 } 480 481 public CoordinatorJobInfo getCoordinatorInfo(final Map<String, List<String>> filter, final int start, final int len) 482 throws StoreException { 483 484 CoordinatorJobInfo coordJobInfo = doOperation("getCoordinatorJobInfo", new Callable<CoordinatorJobInfo>() { 485 public CoordinatorJobInfo call() throws SQLException, StoreException { 486 List<String> orArray = new ArrayList<String>(); 487 List<String> colArray = new ArrayList<String>(); 488 List<String> valArray = new ArrayList<String>(); 489 StringBuilder sb = new StringBuilder(""); 490 491 StoreStatusFilter.filter(filter, orArray, colArray, valArray, sb, StoreStatusFilter.coordSeletStr, 492 StoreStatusFilter.coordCountStr); 493 494 int realLen = 0; 495 496 Query q = null; 497 Query qTotal = null; 498 if (orArray.size() == 0) { 499 q = entityManager.createNamedQuery("GET_COORD_JOBS_COLUMNS"); 500 q.setFirstResult(start - 1); 501 q.setMaxResults(len); 502 qTotal = entityManager.createNamedQuery("GET_COORD_JOBS_COUNT"); 503 } 504 else { 505 StringBuilder sbTotal = new StringBuilder(sb); 506 sb.append(" order by w.createdTimestamp desc "); 507 XLog.getLog(getClass()).debug("Created String is **** " + sb.toString()); 508 q = entityManager.createQuery(sb.toString()); 509 q.setFirstResult(start - 1); 510 q.setMaxResults(len); 511 qTotal = entityManager.createQuery(sbTotal.toString().replace(StoreStatusFilter.coordSeletStr, 512 StoreStatusFilter.coordCountStr)); 513 } 514 515 for (int i = 0; i < orArray.size(); i++) { 516 q.setParameter(colArray.get(i), valArray.get(i)); 517 qTotal.setParameter(colArray.get(i), valArray.get(i)); 518 } 519 520 OpenJPAQuery kq = OpenJPAPersistence.cast(q); 521 JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan(); 522 fetch.setFetchBatchSize(20); 523 fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE); 524 fetch.setFetchDirection(FetchDirection.FORWARD); 525 fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST); 526 List<?> resultList = q.getResultList(); 527 List<Object[]> objectArrList = (List<Object[]>) resultList; 528 List<CoordinatorJobBean> coordBeansList = new ArrayList<CoordinatorJobBean>(); 529 530 for (Object[] arr : objectArrList) { 531 CoordinatorJobBean ww = getBeanForCoordinatorJobFromArray(arr); 532 coordBeansList.add(ww); 533 } 534 535 realLen = ((Long) qTotal.getSingleResult()).intValue(); 536 537 return new CoordinatorJobInfo(coordBeansList, start, len, realLen); 538 } 539 }); 540 return coordJobInfo; 541 } 542 543 private CoordinatorJobBean getBeanForCoordinatorJobFromArray(Object[] arr) { 544 CoordinatorJobBean bean = new CoordinatorJobBean(); 545 bean.setId((String) arr[0]); 546 if (arr[1] != null) { 547 bean.setAppName((String) arr[1]); 548 } 549 if (arr[2] != null) { 550 bean.setStatus(Status.valueOf((String) arr[2])); 551 } 552 if (arr[3] != null) { 553 bean.setUser((String) arr[3]); 554 } 555 if (arr[4] != null) { 556 bean.setGroup((String) arr[4]); 557 } 558 if (arr[5] != null) { 559 bean.setStartTime((Timestamp) arr[5]); 560 } 561 if (arr[6] != null) { 562 bean.setEndTime((Timestamp) arr[6]); 563 } 564 if (arr[7] != null) { 565 bean.setAppPath((String) arr[7]); 566 } 567 if (arr[8] != null) { 568 bean.setConcurrency(((Integer) arr[8]).intValue()); 569 } 570 if (arr[9] != null) { 571 bean.setFrequency((String) arr[9]); 572 } 573 if (arr[10] != null) { 574 bean.setLastActionTime((Timestamp) arr[10]); 575 } 576 if (arr[11] != null) { 577 bean.setNextMaterializedTime((Timestamp) arr[11]); 578 } 579 if (arr[13] != null) { 580 bean.setTimeUnit(Timeunit.valueOf((String) arr[13])); 581 } 582 if (arr[14] != null) { 583 bean.setTimeZone((String) arr[14]); 584 } 585 if (arr[15] != null) { 586 bean.setTimeout((Integer) arr[15]); 587 } 588 return bean; 589 } 590 591 /** 592 * Loads all actions for the given Coordinator job. 593 * 594 * @param jobId coordinator job id 595 * @param locking true if Actions are to be locked 596 * @return A List of CoordinatorActionBean 597 * @throws StoreException 598 */ 599 public Integer getActionsForCoordinatorJob(final String jobId, final boolean locking) 600 throws StoreException { 601 ParamChecker.notEmpty(jobId, "CoordinatorJobID"); 602 Integer actionsCount = doOperation("getActionsForCoordinatorJob", 603 new Callable<Integer>() { 604 @SuppressWarnings("unchecked") 605 public Integer call() throws StoreException { 606 List<CoordinatorActionBean> actions; 607 List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>(); 608 try { 609 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB"); 610 q.setParameter("jobId", jobId); 611 /* 612 * if (locking) { // 613 * q.setHint("openjpa.FetchPlan.ReadLockMode", // 614 * "READ"); OpenJPAQuery oq = 615 * OpenJPAPersistence.cast(q); JDBCFetchPlan fetch = 616 * (JDBCFetchPlan) oq.getFetchPlan(); 617 * fetch.setReadLockMode(LockModeType.WRITE); 618 * fetch.setLockTimeout(-1); // 1 second } 619 */ 620 Long count = (Long) q.getSingleResult(); 621 return Integer.valueOf(count.intValue()); 622 /*actions = q.getResultList(); 623 for (CoordinatorActionBean a : actions) { 624 CoordinatorActionBean aa = getBeanForRunningCoordAction(a); 625 actionList.add(aa); 626 }*/ 627 } 628 catch (IllegalStateException e) { 629 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 630 } 631 /* 632 * if (locking) { return actions; } else { 633 */ 634 635 // } 636 } 637 }); 638 return actionsCount; 639 } 640 641 /** 642 * Loads given number of actions for the given Coordinator job. 643 * 644 * @param jobId coordinator job id 645 * @param start offset for select statement 646 * @param len number of Workflow Actions to be returned 647 * @return A List of CoordinatorActionBean 648 * @throws StoreException 649 */ 650 public List<CoordinatorActionBean> getActionsSubsetForCoordinatorJob(final String jobId, final int start, 651 final int len) throws StoreException { 652 ParamChecker.notEmpty(jobId, "CoordinatorJobID"); 653 List<CoordinatorActionBean> actions = doOperation("getActionsForCoordinatorJob", 654 new Callable<List<CoordinatorActionBean>>() { 655 @SuppressWarnings("unchecked") 656 public List<CoordinatorActionBean> call() throws StoreException { 657 List<CoordinatorActionBean> actions; 658 List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>(); 659 try { 660 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB"); 661 q.setParameter("jobId", jobId); 662 q.setFirstResult(start - 1); 663 q.setMaxResults(len); 664 actions = q.getResultList(); 665 for (CoordinatorActionBean a : actions) { 666 CoordinatorActionBean aa = getBeanForRunningCoordAction(a); 667 actionList.add(aa); 668 } 669 } 670 catch (IllegalStateException e) { 671 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 672 } 673 return actionList; 674 } 675 }); 676 return actions; 677 } 678 679 protected CoordinatorActionBean getBeanForRunningCoordAction(CoordinatorActionBean a) { 680 if (a != null) { 681 CoordinatorActionBean action = new CoordinatorActionBean(); 682 action.setId(a.getId()); 683 action.setActionNumber(a.getActionNumber()); 684 action.setActionXmlBlob(a.getActionXmlBlob()); 685 action.setConsoleUrl(a.getConsoleUrl()); 686 action.setCreatedConfBlob(a.getCreatedConfBlob()); 687 action.setErrorCode(a.getErrorCode()); 688 action.setErrorMessage(a.getErrorMessage()); 689 action.setExternalStatus(a.getExternalStatus()); 690 action.setMissingDependenciesBlob(a.getMissingDependenciesBlob()); 691 action.setRunConfBlob(a.getRunConfBlob()); 692 action.setTimeOut(a.getTimeOut()); 693 action.setTrackerUri(a.getTrackerUri()); 694 action.setType(a.getType()); 695 action.setCreatedTime(a.getCreatedTime()); 696 action.setExternalId(a.getExternalId()); 697 action.setJobId(a.getJobId()); 698 action.setLastModifiedTime(a.getLastModifiedTime()); 699 action.setNominalTime(a.getNominalTime()); 700 action.setSlaXmlBlob(a.getSlaXmlBlob()); 701 action.setStatus(a.getStatus()); 702 return action; 703 } 704 return null; 705 } 706 707 public CoordinatorActionBean getAction(String id, boolean b) { 708 return null; 709 } 710 711 712 public List<CoordinatorActionBean> getRunningActionsForCoordinatorJob(final String jobId, final boolean locking) 713 throws StoreException { 714 ParamChecker.notEmpty(jobId, "CoordinatorJobID"); 715 List<CoordinatorActionBean> actions = doOperation("getRunningActionsForCoordinatorJob", 716 new Callable<List<CoordinatorActionBean>>() { 717 @SuppressWarnings("unchecked") 718 public List<CoordinatorActionBean> call() throws StoreException { 719 List<CoordinatorActionBean> actions; 720 try { 721 Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_FOR_COORD_JOB"); 722 q.setParameter("jobId", jobId); 723 /* 724 * if (locking) { 725 * q.setHint("openjpa.FetchPlan.ReadLockMode", 726 * "READ"); OpenJPAQuery oq = 727 * OpenJPAPersistence.cast(q); FetchPlan fetch = 728 * oq.getFetchPlan(); 729 * fetch.setReadLockMode(LockModeType.WRITE); 730 * fetch.setLockTimeout(-1); // no limit } 731 */ 732 actions = q.getResultList(); 733 return actions; 734 } 735 catch (IllegalStateException e) { 736 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 737 } 738 } 739 }); 740 return actions; 741 } 742 743 public List<CoordinatorActionBean> getRunningActionsOlderThan(final long checkAgeSecs, final boolean locking) 744 throws StoreException { 745 List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan", 746 new Callable<List<CoordinatorActionBean>>() { 747 @SuppressWarnings("unchecked") 748 public List<CoordinatorActionBean> call() throws StoreException { 749 List<CoordinatorActionBean> actions; 750 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000); 751 try { 752 Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_OLDER_THAN"); 753 q.setParameter("lastModifiedTime", ts); 754 /* 755 * if (locking) { OpenJPAQuery oq = 756 * OpenJPAPersistence.cast(q); FetchPlan fetch = 757 * oq.getFetchPlan(); 758 * fetch.setReadLockMode(LockModeType.WRITE); 759 * fetch.setLockTimeout(-1); // no limit } 760 */ 761 actions = q.getResultList(); 762 return actions; 763 } 764 catch (IllegalStateException e) { 765 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 766 } 767 } 768 }); 769 return actions; 770 } 771 772 public List<CoordinatorActionBean> getRecoveryActionsOlderThan(final long checkAgeSecs, final boolean locking) 773 throws StoreException { 774 List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan", 775 new Callable<List<CoordinatorActionBean>>() { 776 @SuppressWarnings("unchecked") 777 public List<CoordinatorActionBean> call() throws StoreException { 778 List<CoordinatorActionBean> actions; 779 try { 780 Query q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN"); 781 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000); 782 q.setParameter("lastModifiedTime", ts); 783 /* 784 * if (locking) { OpenJPAQuery oq = 785 * OpenJPAPersistence.cast(q); FetchPlan fetch = 786 * oq.getFetchPlan(); 787 * fetch.setReadLockMode(LockModeType.WRITE); 788 * fetch.setLockTimeout(-1); // no limit } 789 */ 790 actions = q.getResultList(); 791 return actions; 792 } 793 catch (IllegalStateException e) { 794 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 795 } 796 } 797 }); 798 return actions; 799 } 800 801 /** 802 * Get coordinator action beans for given start date and end date 803 * 804 * @param startDate 805 * @param endDate 806 * @return list of coordinator action beans 807 * @throws StoreException 808 */ 809 public List<CoordinatorActionBean> getCoordActionsForDates(final String jobId, final Date startDate, 810 final Date endDate) throws StoreException { 811 List<CoordinatorActionBean> actions = doOperation("getCoordActionsForDates", 812 new Callable<List<CoordinatorActionBean>>() { 813 @SuppressWarnings("unchecked") 814 public List<CoordinatorActionBean> call() throws StoreException { 815 List<CoordinatorActionBean> actions; 816 try { 817 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_DATES"); 818 q.setParameter("jobId", jobId); 819 q.setParameter("startTime", new Timestamp(startDate.getTime())); 820 q.setParameter("endTime", new Timestamp(endDate.getTime())); 821 actions = q.getResultList(); 822 List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>(); 823 for (CoordinatorActionBean a : actions) { 824 CoordinatorActionBean aa = getBeanForRunningCoordAction(a); 825 actionList.add(aa); 826 } 827 return actionList; 828 } 829 catch (IllegalStateException e) { 830 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 831 } 832 } 833 }); 834 return actions; 835 } 836 837 /** 838 * Get coordinator action bean for given date 839 * 840 * @param nominalTime 841 * @return CoordinatorActionBean 842 * @throws StoreException 843 */ 844 public CoordinatorActionBean getCoordActionForNominalTime(final String jobId, final Date nominalTime) 845 throws StoreException { 846 CoordinatorActionBean action = doOperation("getCoordActionForNominalTime", 847 new Callable<CoordinatorActionBean>() { 848 @SuppressWarnings("unchecked") 849 public CoordinatorActionBean call() throws StoreException { 850 List<CoordinatorActionBean> actions; 851 Query q = entityManager.createNamedQuery("GET_ACTION_FOR_NOMINALTIME"); 852 q.setParameter("jobId", jobId); 853 q.setParameter("nominalTime", new Timestamp(nominalTime.getTime())); 854 actions = q.getResultList(); 855 856 CoordinatorActionBean action = null; 857 if (actions.size() > 0) { 858 action = actions.get(0); 859 } 860 else { 861 throw new StoreException(ErrorCode.E0605, DateUtils.formatDateOozieTZ(nominalTime)); 862 } 863 return getBeanForRunningCoordAction(action); 864 } 865 }); 866 return action; 867 } 868 869 public List<String> getRecoveryActionsGroupByJobId(final long checkAgeSecs) throws StoreException { 870 List<String> jobids = doOperation("getRecoveryActionsGroupByJobId", new Callable<List<String>>() { 871 @SuppressWarnings("unchecked") 872 public List<String> call() throws StoreException { 873 List<String> jobids = new ArrayList<String>(); 874 try { 875 Query q = entityManager.createNamedQuery("GET_READY_ACTIONS_GROUP_BY_JOBID"); 876 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000); 877 q.setParameter(1, ts); 878 List<Object[]> list = q.getResultList(); 879 880 for (Object[] arr : list) { 881 if (arr != null && arr[0] != null) { 882 jobids.add((String) arr[0]); 883 } 884 } 885 886 return jobids; 887 } 888 catch (IllegalStateException e) { 889 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 890 } 891 } 892 }); 893 return jobids; 894 } 895}