001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.store; 019 020 import java.sql.SQLException; 021 import java.sql.Timestamp; 022 import java.util.ArrayList; 023 import java.util.Date; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.concurrent.Callable; 027 028 import javax.persistence.EntityManager; 029 import javax.persistence.Query; 030 031 import org.apache.oozie.CoordinatorActionBean; 032 import org.apache.oozie.CoordinatorJobBean; 033 import org.apache.oozie.CoordinatorJobInfo; 034 import org.apache.oozie.ErrorCode; 035 import org.apache.oozie.client.Job.Status; 036 import org.apache.oozie.client.CoordinatorJob.Timeunit; 037 import org.apache.oozie.service.InstrumentationService; 038 import org.apache.oozie.service.Services; 039 import org.apache.oozie.util.DateUtils; 040 import org.apache.oozie.util.Instrumentation; 041 import org.apache.oozie.util.ParamChecker; 042 import org.apache.oozie.util.XLog; 043 import org.apache.oozie.workflow.WorkflowException; 044 import org.apache.openjpa.persistence.OpenJPAPersistence; 045 import org.apache.openjpa.persistence.OpenJPAQuery; 046 import org.apache.openjpa.persistence.jdbc.FetchDirection; 047 import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan; 048 import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm; 049 import org.apache.openjpa.persistence.jdbc.ResultSetType; 050 051 /** 052 * DB Implementation of Coord Store 053 */ 054 public class CoordinatorStore extends Store { 055 private final XLog log = XLog.getLog(getClass()); 056 057 private EntityManager entityManager; 058 private static final String INSTR_GROUP = "db"; 059 public static final int LOCK_TIMEOUT = 50000; 060 private static final long DAY_IN_MS = 24 * 60 * 60 * 1000; 061 062 public CoordinatorStore(boolean selectForUpdate) throws StoreException { 063 super(); 064 entityManager = getEntityManager(); 065 } 066 067 public CoordinatorStore(Store store, boolean selectForUpdate) throws StoreException { 068 super(store); 069 entityManager = getEntityManager(); 070 } 071 072 /** 073 * Create a CoordJobBean. It also creates the process instance for the job. 074 * 075 * @param workflow workflow bean 076 * @throws StoreException 077 */ 078 079 public void insertCoordinatorJob(final CoordinatorJobBean coordinatorJob) throws StoreException { 080 ParamChecker.notNull(coordinatorJob, "coordinatorJob"); 081 082 doOperation("insertCoordinatorJob", new Callable<Void>() { 083 public Void call() throws StoreException { 084 entityManager.persist(coordinatorJob); 085 return null; 086 } 087 }); 088 } 089 090 /** 091 * Load the CoordinatorJob into a Bean and return it. Also load the Workflow Instance into the bean. And lock the 092 * Workflow depending on the locking parameter. 093 * 094 * @param id Job ID 095 * @param locking Flag for Table Lock 096 * @return CoordinatorJobBean 097 * @throws StoreException 098 */ 099 public CoordinatorJobBean getCoordinatorJob(final String id, final boolean locking) throws StoreException { 100 ParamChecker.notEmpty(id, "CoordJobId"); 101 CoordinatorJobBean cjBean = doOperation("getCoordinatorJob", new Callable<CoordinatorJobBean>() { 102 @SuppressWarnings("unchecked") 103 public CoordinatorJobBean call() throws StoreException { 104 Query q = entityManager.createNamedQuery("GET_COORD_JOB"); 105 q.setParameter("id", id); 106 /* 107 * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q); 108 * // q.setHint("openjpa.FetchPlan.ReadLockMode","WRITE"); 109 * FetchPlan fetch = oq.getFetchPlan(); 110 * fetch.setReadLockMode(LockModeType.WRITE); 111 * fetch.setLockTimeout(-1); // 1 second } 112 */ 113 List<CoordinatorJobBean> cjBeans = q.getResultList(); 114 115 if (cjBeans.size() > 0) { 116 return cjBeans.get(0); 117 } 118 else { 119 throw new StoreException(ErrorCode.E0604, id); 120 } 121 } 122 }); 123 124 cjBean.setStatus(cjBean.getStatus()); 125 return cjBean; 126 } 127 128 /** 129 * Get a list of Coordinator Jobs that should be materialized. Jobs with a 'last materialized time' older than the 130 * argument will be returned. 131 * 132 * @param d Date 133 * @return List of Coordinator Jobs that have a last materialized time older than input date 134 * @throws StoreException 135 */ 136 public List<CoordinatorJobBean> getCoordinatorJobsToBeMaterialized(final Date d, final int limit) 137 throws StoreException { 138 139 ParamChecker.notNull(d, "Coord Job Materialization Date"); 140 List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsToBeMaterialized", 141 new Callable<List<CoordinatorJobBean>>() { 142 public List<CoordinatorJobBean> call() throws StoreException { 143 144 List<CoordinatorJobBean> cjBeans; 145 List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>(); 146 try { 147 Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN"); 148 q.setParameter("matTime", new Timestamp(d.getTime())); 149 if (limit > 0) { 150 q.setMaxResults(limit); 151 } 152 /* 153 OpenJPAQuery oq = OpenJPAPersistence.cast(q); 154 FetchPlan fetch = oq.getFetchPlan(); 155 fetch.setReadLockMode(LockModeType.WRITE); 156 fetch.setLockTimeout(-1); // no limit 157 */ 158 cjBeans = q.getResultList(); 159 // copy results to a new object 160 for (CoordinatorJobBean j : cjBeans) { 161 jobList.add(j); 162 } 163 } 164 catch (IllegalStateException e) { 165 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 166 } 167 return jobList; 168 169 } 170 }); 171 return cjBeans; 172 } 173 174 /** 175 * A list of Coordinator Jobs that are matched with the status and have last materialized time' older than 176 * checkAgeSecs will be returned. 177 * 178 * @param checkAgeSecs Job age in Seconds 179 * @param status Coordinator Job Status 180 * @param limit Number of results to return 181 * @param locking Flag for Table Lock 182 * @return List of Coordinator Jobs that are matched with the parameters. 183 * @throws StoreException 184 */ 185 public List<CoordinatorJobBean> getCoordinatorJobsOlderThanStatus(final long checkAgeSecs, final String status, 186 final int limit, final boolean locking) throws StoreException { 187 188 ParamChecker.notNull(status, "Coord Job Status"); 189 List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsOlderThanStatus", 190 new Callable<List<CoordinatorJobBean>>() { 191 public List<CoordinatorJobBean> call() throws StoreException { 192 193 List<CoordinatorJobBean> cjBeans; 194 List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>(); 195 try { 196 Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN_STATUS"); 197 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000); 198 q.setParameter("lastModTime", ts); 199 q.setParameter("status", status); 200 if (limit > 0) { 201 q.setMaxResults(limit); 202 } 203 /* 204 * if (locking) { OpenJPAQuery oq = 205 * OpenJPAPersistence.cast(q); FetchPlan fetch = 206 * oq.getFetchPlan(); 207 * fetch.setReadLockMode(LockModeType.WRITE); 208 * fetch.setLockTimeout(-1); // no limit } 209 */ 210 cjBeans = q.getResultList(); 211 for (CoordinatorJobBean j : cjBeans) { 212 jobList.add(j); 213 } 214 } 215 catch (Exception e) { 216 throw new StoreException(ErrorCode.E0603, e.getMessage(), e); 217 } 218 return jobList; 219 220 } 221 }); 222 return cjBeans; 223 } 224 225 /** 226 * Load the CoordinatorAction into a Bean and return it. 227 * 228 * @param id action ID 229 * @return CoordinatorActionBean 230 * @throws StoreException 231 */ 232 public CoordinatorActionBean getCoordinatorAction(final String id, final boolean locking) throws StoreException { 233 ParamChecker.notEmpty(id, "actionID"); 234 CoordinatorActionBean caBean = doOperation("getCoordinatorAction", new Callable<CoordinatorActionBean>() { 235 public CoordinatorActionBean call() throws StoreException { 236 Query q = entityManager.createNamedQuery("GET_COORD_ACTION"); 237 q.setParameter("id", id); 238 OpenJPAQuery oq = OpenJPAPersistence.cast(q); 239 /* 240 * if (locking) { //q.setHint("openjpa.FetchPlan.ReadLockMode", 241 * "WRITE"); FetchPlan fetch = oq.getFetchPlan(); 242 * fetch.setReadLockMode(LockModeType.WRITE); 243 * fetch.setLockTimeout(-1); // no limit } 244 */ 245 246 CoordinatorActionBean action = null; 247 List<CoordinatorActionBean> actions = q.getResultList(); 248 if (actions.size() > 0) { 249 action = actions.get(0); 250 } 251 else { 252 throw new StoreException(ErrorCode.E0605, id); 253 } 254 255 /* 256 * if (locking) return action; else 257 */ 258 return getBeanForRunningCoordAction(action); 259 } 260 }); 261 return caBean; 262 } 263 264 /** 265 * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <= 266 * concurrency number. Sort returned actions based on execution order (FIFO, LIFO, LAST_ONLY) 267 * 268 * @param id job ID 269 * @param numResults number of results to return 270 * @param executionOrder execution for this job - FIFO, LIFO, LAST_ONLY 271 * @return List of CoordinatorActionBean 272 * @throws StoreException 273 */ 274 public List<CoordinatorActionBean> getCoordinatorActionsForJob(final String id, final int numResults, 275 final String executionOrder) throws StoreException { 276 ParamChecker.notEmpty(id, "jobID"); 277 List<CoordinatorActionBean> caBeans = doOperation("getCoordinatorActionsForJob", 278 new Callable<List<CoordinatorActionBean>>() { 279 public List<CoordinatorActionBean> call() throws StoreException { 280 281 List<CoordinatorActionBean> caBeans; 282 Query q; 283 // check if executionOrder is FIFO, LIFO, or LAST_ONLY 284 if (executionOrder.equalsIgnoreCase("FIFO")) { 285 q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_FIFO"); 286 } 287 else { 288 q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_LIFO"); 289 } 290 q.setParameter("jobId", id); 291 // if executionOrder is LAST_ONLY, only retrieve first 292 // record in LIFO, 293 // otherwise, use numResults if it is positive. 294 if (executionOrder.equalsIgnoreCase("LAST_ONLY")) { 295 q.setMaxResults(1); 296 } 297 else { 298 if (numResults > 0) { 299 q.setMaxResults(numResults); 300 } 301 } 302 caBeans = q.getResultList(); 303 return caBeans; 304 } 305 }); 306 return caBeans; 307 } 308 309 /** 310 * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <= 311 * concurrency number. 312 * 313 * @param id job ID 314 * @return Number of running actions 315 * @throws StoreException 316 */ 317 public int getCoordinatorRunningActionsCount(final String id) throws StoreException { 318 ParamChecker.notEmpty(id, "jobID"); 319 Integer cnt = doOperation("getCoordinatorRunningActionsCount", new Callable<Integer>() { 320 public Integer call() throws SQLException { 321 322 Query q = entityManager.createNamedQuery("GET_COORD_RUNNING_ACTIONS_COUNT"); 323 324 q.setParameter("jobId", id); 325 Long count = (Long) q.getSingleResult(); 326 return Integer.valueOf(count.intValue()); 327 } 328 }); 329 return cnt.intValue(); 330 } 331 332 /** 333 * Create a new Action record in the ACTIONS table with the given Bean. 334 * 335 * @param action WorkflowActionBean 336 * @throws StoreException If the action is already present 337 */ 338 public void insertCoordinatorAction(final CoordinatorActionBean action) throws StoreException { 339 ParamChecker.notNull(action, "CoordinatorActionBean"); 340 doOperation("insertCoordinatorAction", new Callable<Void>() { 341 public Void call() throws StoreException { 342 entityManager.persist(action); 343 return null; 344 } 345 }); 346 } 347 348 /** 349 * Update the given action bean to DB. 350 * 351 * @param action Action Bean 352 * @throws StoreException if action doesn't exist 353 */ 354 public void updateCoordinatorAction(final CoordinatorActionBean action) throws StoreException { 355 ParamChecker.notNull(action, "CoordinatorActionBean"); 356 doOperation("updateCoordinatorAction", new Callable<Void>() { 357 public Void call() throws StoreException { 358 Query q = entityManager.createNamedQuery("UPDATE_COORD_ACTION"); 359 q.setParameter("id", action.getId()); 360 setActionQueryParameters(action, q); 361 q.executeUpdate(); 362 return null; 363 } 364 }); 365 } 366 367 /** 368 * Update the given action bean to DB. 369 * 370 * @param action Action Bean 371 * @throws StoreException if action doesn't exist 372 */ 373 public void updateCoordActionMin(final CoordinatorActionBean action) throws StoreException { 374 ParamChecker.notNull(action, "CoordinatorActionBean"); 375 doOperation("updateCoordinatorAction", new Callable<Void>() { 376 public Void call() throws StoreException { 377 Query q = entityManager.createNamedQuery("UPDATE_COORD_ACTION_MIN"); 378 q.setParameter("id", action.getId()); 379 q.setParameter("missingDependencies", action.getMissingDependencies()); 380 q.setParameter("lastModifiedTime", new Date()); 381 q.setParameter("status", action.getStatus().toString()); 382 q.setParameter("actionXml", action.getActionXml()); 383 q.executeUpdate(); 384 return null; 385 } 386 }); 387 } 388 389 /** 390 * Update the given coordinator job bean to DB. 391 * 392 * @param jobbean Coordinator Job Bean 393 * @throws StoreException if action doesn't exist 394 */ 395 public void updateCoordinatorJob(final CoordinatorJobBean job) throws StoreException { 396 ParamChecker.notNull(job, "CoordinatorJobBean"); 397 doOperation("updateJob", new Callable<Void>() { 398 public Void call() throws StoreException { 399 Query q = entityManager.createNamedQuery("UPDATE_COORD_JOB"); 400 q.setParameter("id", job.getId()); 401 setJobQueryParameters(job, q); 402 q.executeUpdate(); 403 return null; 404 } 405 }); 406 } 407 408 public void updateCoordinatorJobStatus(final CoordinatorJobBean job) throws StoreException { 409 ParamChecker.notNull(job, "CoordinatorJobBean"); 410 doOperation("updateJobStatus", new Callable<Void>() { 411 public Void call() throws StoreException { 412 Query q = entityManager.createNamedQuery("UPDATE_COORD_JOB_STATUS"); 413 q.setParameter("id", job.getId()); 414 q.setParameter("status", job.getStatus().toString()); 415 q.setParameter("lastModifiedTime", new Date()); 416 q.executeUpdate(); 417 return null; 418 } 419 }); 420 } 421 422 private <V> V doOperation(String name, Callable<V> command) throws StoreException { 423 try { 424 Instrumentation.Cron cron = new Instrumentation.Cron(); 425 cron.start(); 426 V retVal; 427 try { 428 retVal = command.call(); 429 } 430 finally { 431 cron.stop(); 432 } 433 Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron); 434 return retVal; 435 } 436 catch (StoreException ex) { 437 throw ex; 438 } 439 catch (SQLException ex) { 440 throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex); 441 } 442 catch (Exception e) { 443 throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e); 444 } 445 } 446 447 private void setJobQueryParameters(CoordinatorJobBean jBean, Query q) { 448 q.setParameter("appName", jBean.getAppName()); 449 q.setParameter("appPath", jBean.getAppPath()); 450 q.setParameter("concurrency", jBean.getConcurrency()); 451 q.setParameter("conf", jBean.getConf()); 452 q.setParameter("externalId", jBean.getExternalId()); 453 q.setParameter("frequency", jBean.getFrequency()); 454 q.setParameter("lastActionNumber", jBean.getLastActionNumber()); 455 q.setParameter("timeOut", jBean.getTimeout()); 456 q.setParameter("timeZone", jBean.getTimeZone()); 457 q.setParameter("createdTime", jBean.getCreatedTimestamp()); 458 q.setParameter("endTime", jBean.getEndTimestamp()); 459 q.setParameter("execution", jBean.getExecution()); 460 q.setParameter("jobXml", jBean.getJobXml()); 461 q.setParameter("lastAction", jBean.getLastActionTimestamp()); 462 q.setParameter("lastModifiedTime", new Date()); 463 q.setParameter("nextMaterializedTime", jBean.getNextMaterializedTimestamp()); 464 q.setParameter("origJobXml", jBean.getOrigJobXml()); 465 q.setParameter("slaXml", jBean.getSlaXml()); 466 q.setParameter("startTime", jBean.getStartTimestamp()); 467 q.setParameter("status", jBean.getStatus().toString()); 468 q.setParameter("timeUnit", jBean.getTimeUnitStr()); 469 } 470 471 private void setActionQueryParameters(CoordinatorActionBean aBean, Query q) { 472 q.setParameter("actionNumber", aBean.getActionNumber()); 473 q.setParameter("actionXml", aBean.getActionXml()); 474 q.setParameter("consoleUrl", aBean.getConsoleUrl()); 475 q.setParameter("createdConf", aBean.getCreatedConf()); 476 q.setParameter("errorCode", aBean.getErrorCode()); 477 q.setParameter("errorMessage", aBean.getErrorMessage()); 478 q.setParameter("externalStatus", aBean.getExternalStatus()); 479 q.setParameter("missingDependencies", aBean.getMissingDependencies()); 480 q.setParameter("runConf", aBean.getRunConf()); 481 q.setParameter("timeOut", aBean.getTimeOut()); 482 q.setParameter("trackerUri", aBean.getTrackerUri()); 483 q.setParameter("type", aBean.getType()); 484 q.setParameter("createdTime", aBean.getCreatedTimestamp()); 485 q.setParameter("externalId", aBean.getExternalId()); 486 q.setParameter("jobId", aBean.getJobId()); 487 q.setParameter("lastModifiedTime", new Date()); 488 q.setParameter("nominalTime", aBean.getNominalTimestamp()); 489 q.setParameter("slaXml", aBean.getSlaXml()); 490 q.setParameter("status", aBean.getStatus().toString()); 491 } 492 493 494 /** 495 * Purge the coordinators completed older than given days. 496 * 497 * @param olderThanDays number of days for which to preserve the coordinators 498 * @param limit maximum number of coordinator jobs to be purged 499 * @throws StoreException 500 */ 501 public void purge(final long olderThanDays, final int limit) throws StoreException { 502 doOperation("coord-purge", new Callable<Void>() { 503 public Void call() throws SQLException, StoreException, WorkflowException { 504 Timestamp lastModTm = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS)); 505 Query jobQ = entityManager.createNamedQuery("GET_COMPLETED_COORD_JOBS_OLDER_THAN_STATUS"); 506 jobQ.setParameter("lastModTime", lastModTm); 507 jobQ.setMaxResults(limit); 508 List<CoordinatorJobBean> coordJobs = jobQ.getResultList(); 509 510 int actionDeleted = 0; 511 if (coordJobs.size() != 0) { 512 for (CoordinatorJobBean coord : coordJobs) { 513 String jobId = coord.getId(); 514 entityManager.remove(coord); 515 Query g = entityManager.createNamedQuery("DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR"); 516 g.setParameter("jobId", jobId); 517 actionDeleted += g.executeUpdate(); 518 } 519 } 520 521 XLog.getLog(getClass()).debug("ENDED Coord Purge deleted jobs :" + coordJobs.size() + " and actions " + actionDeleted); 522 return null; 523 } 524 }); 525 } 526 527 public void commit() throws StoreException { 528 } 529 530 public void close() throws StoreException { 531 } 532 533 public CoordinatorJobBean getCoordinatorJobs(String id) { 534 // TODO Auto-generated method stub 535 return null; 536 } 537 538 public CoordinatorJobInfo getCoordinatorInfo(final Map<String, List<String>> filter, final int start, final int len) 539 throws StoreException { 540 541 CoordinatorJobInfo coordJobInfo = doOperation("getCoordinatorJobInfo", new Callable<CoordinatorJobInfo>() { 542 public CoordinatorJobInfo call() throws SQLException, StoreException { 543 List<String> orArray = new ArrayList<String>(); 544 List<String> colArray = new ArrayList<String>(); 545 List<String> valArray = new ArrayList<String>(); 546 StringBuilder sb = new StringBuilder(""); 547 548 StoreStatusFilter.filter(filter, orArray, colArray, valArray, sb, StoreStatusFilter.coordSeletStr, 549 StoreStatusFilter.coordCountStr); 550 551 int realLen = 0; 552 553 Query q = null; 554 Query qTotal = null; 555 if (orArray.size() == 0) { 556 q = entityManager.createNamedQuery("GET_COORD_JOBS_COLUMNS"); 557 q.setFirstResult(start - 1); 558 q.setMaxResults(len); 559 qTotal = entityManager.createNamedQuery("GET_COORD_JOBS_COUNT"); 560 } 561 else { 562 StringBuilder sbTotal = new StringBuilder(sb); 563 sb.append(" order by w.createdTimestamp desc "); 564 XLog.getLog(getClass()).debug("Created String is **** " + sb.toString()); 565 q = entityManager.createQuery(sb.toString()); 566 q.setFirstResult(start - 1); 567 q.setMaxResults(len); 568 qTotal = entityManager.createQuery(sbTotal.toString().replace(StoreStatusFilter.coordSeletStr, 569 StoreStatusFilter.coordCountStr)); 570 } 571 572 for (int i = 0; i < orArray.size(); i++) { 573 q.setParameter(colArray.get(i), valArray.get(i)); 574 qTotal.setParameter(colArray.get(i), valArray.get(i)); 575 } 576 577 OpenJPAQuery kq = OpenJPAPersistence.cast(q); 578 JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan(); 579 fetch.setFetchBatchSize(20); 580 fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE); 581 fetch.setFetchDirection(FetchDirection.FORWARD); 582 fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST); 583 List<?> resultList = q.getResultList(); 584 List<Object[]> objectArrList = (List<Object[]>) resultList; 585 List<CoordinatorJobBean> coordBeansList = new ArrayList<CoordinatorJobBean>(); 586 587 for (Object[] arr : objectArrList) { 588 CoordinatorJobBean ww = getBeanForCoordinatorJobFromArray(arr); 589 coordBeansList.add(ww); 590 } 591 592 realLen = ((Long) qTotal.getSingleResult()).intValue(); 593 594 return new CoordinatorJobInfo(coordBeansList, start, len, realLen); 595 } 596 }); 597 return coordJobInfo; 598 } 599 600 private CoordinatorJobBean getBeanForCoordinatorJobFromArray(Object[] arr) { 601 CoordinatorJobBean bean = new CoordinatorJobBean(); 602 bean.setId((String) arr[0]); 603 if (arr[1] != null) { 604 bean.setAppName((String) arr[1]); 605 } 606 if (arr[2] != null) { 607 bean.setStatus(Status.valueOf((String) arr[2])); 608 } 609 if (arr[3] != null) { 610 bean.setUser((String) arr[3]); 611 } 612 if (arr[4] != null) { 613 bean.setGroup((String) arr[4]); 614 } 615 if (arr[5] != null) { 616 bean.setStartTime((Timestamp) arr[5]); 617 } 618 if (arr[6] != null) { 619 bean.setEndTime((Timestamp) arr[6]); 620 } 621 if (arr[7] != null) { 622 bean.setAppPath((String) arr[7]); 623 } 624 if (arr[8] != null) { 625 bean.setConcurrency(((Integer) arr[8]).intValue()); 626 } 627 if (arr[9] != null) { 628 bean.setFrequency((String) arr[9]); 629 } 630 if (arr[10] != null) { 631 bean.setLastActionTime((Timestamp) arr[10]); 632 } 633 if (arr[11] != null) { 634 bean.setNextMaterializedTime((Timestamp) arr[11]); 635 } 636 if (arr[13] != null) { 637 bean.setTimeUnit(Timeunit.valueOf((String) arr[13])); 638 } 639 if (arr[14] != null) { 640 bean.setTimeZone((String) arr[14]); 641 } 642 if (arr[15] != null) { 643 bean.setTimeout((Integer) arr[15]); 644 } 645 return bean; 646 } 647 648 /** 649 * Loads all actions for the given Coordinator job. 650 * 651 * @param jobId coordinator job id 652 * @param locking true if Actions are to be locked 653 * @return A List of CoordinatorActionBean 654 * @throws StoreException 655 */ 656 public Integer getActionsForCoordinatorJob(final String jobId, final boolean locking) 657 throws StoreException { 658 ParamChecker.notEmpty(jobId, "CoordinatorJobID"); 659 Integer actionsCount = doOperation("getActionsForCoordinatorJob", 660 new Callable<Integer>() { 661 @SuppressWarnings("unchecked") 662 public Integer call() throws StoreException { 663 List<CoordinatorActionBean> actions; 664 List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>(); 665 try { 666 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB"); 667 q.setParameter("jobId", jobId); 668 /* 669 * if (locking) { // 670 * q.setHint("openjpa.FetchPlan.ReadLockMode", // 671 * "READ"); OpenJPAQuery oq = 672 * OpenJPAPersistence.cast(q); JDBCFetchPlan fetch = 673 * (JDBCFetchPlan) oq.getFetchPlan(); 674 * fetch.setReadLockMode(LockModeType.WRITE); 675 * fetch.setLockTimeout(-1); // 1 second } 676 */ 677 Long count = (Long) q.getSingleResult(); 678 return Integer.valueOf(count.intValue()); 679 /*actions = q.getResultList(); 680 for (CoordinatorActionBean a : actions) { 681 CoordinatorActionBean aa = getBeanForRunningCoordAction(a); 682 actionList.add(aa); 683 }*/ 684 } 685 catch (IllegalStateException e) { 686 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 687 } 688 /* 689 * if (locking) { return actions; } else { 690 */ 691 692 // } 693 } 694 }); 695 return actionsCount; 696 } 697 698 /** 699 * Loads given number of actions for the given Coordinator job. 700 * 701 * @param jobId coordinator job id 702 * @param start offset for select statement 703 * @param len number of Workflow Actions to be returned 704 * @return A List of CoordinatorActionBean 705 * @throws StoreException 706 */ 707 public List<CoordinatorActionBean> getActionsSubsetForCoordinatorJob(final String jobId, final int start, 708 final int len) throws StoreException { 709 ParamChecker.notEmpty(jobId, "CoordinatorJobID"); 710 List<CoordinatorActionBean> actions = doOperation("getActionsForCoordinatorJob", 711 new Callable<List<CoordinatorActionBean>>() { 712 @SuppressWarnings("unchecked") 713 public List<CoordinatorActionBean> call() throws StoreException { 714 List<CoordinatorActionBean> actions; 715 List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>(); 716 try { 717 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB"); 718 q.setParameter("jobId", jobId); 719 q.setFirstResult(start - 1); 720 q.setMaxResults(len); 721 actions = q.getResultList(); 722 for (CoordinatorActionBean a : actions) { 723 CoordinatorActionBean aa = getBeanForRunningCoordAction(a); 724 actionList.add(aa); 725 } 726 } 727 catch (IllegalStateException e) { 728 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 729 } 730 return actionList; 731 } 732 }); 733 return actions; 734 } 735 736 protected CoordinatorActionBean getBeanForRunningCoordAction(CoordinatorActionBean a) { 737 if (a != null) { 738 CoordinatorActionBean action = new CoordinatorActionBean(); 739 action.setId(a.getId()); 740 action.setActionNumber(a.getActionNumber()); 741 action.setActionXml(a.getActionXml()); 742 action.setConsoleUrl(a.getConsoleUrl()); 743 action.setCreatedConf(a.getCreatedConf()); 744 //action.setErrorCode(a.getErrorCode()); 745 //action.setErrorMessage(a.getErrorMessage()); 746 action.setExternalStatus(a.getExternalStatus()); 747 action.setMissingDependencies(a.getMissingDependencies()); 748 action.setRunConf(a.getRunConf()); 749 action.setTimeOut(a.getTimeOut()); 750 action.setTrackerUri(a.getTrackerUri()); 751 action.setType(a.getType()); 752 action.setCreatedTime(a.getCreatedTime()); 753 action.setExternalId(a.getExternalId()); 754 action.setJobId(a.getJobId()); 755 action.setLastModifiedTime(a.getLastModifiedTime()); 756 action.setNominalTime(a.getNominalTime()); 757 action.setSlaXml(a.getSlaXml()); 758 action.setStatus(a.getStatus()); 759 return action; 760 } 761 return null; 762 } 763 764 public CoordinatorActionBean getAction(String id, boolean b) { 765 return null; 766 } 767 768 769 public List<CoordinatorActionBean> getRunningActionsForCoordinatorJob(final String jobId, final boolean locking) 770 throws StoreException { 771 ParamChecker.notEmpty(jobId, "CoordinatorJobID"); 772 List<CoordinatorActionBean> actions = doOperation("getRunningActionsForCoordinatorJob", 773 new Callable<List<CoordinatorActionBean>>() { 774 @SuppressWarnings("unchecked") 775 public List<CoordinatorActionBean> call() throws StoreException { 776 List<CoordinatorActionBean> actions; 777 try { 778 Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_FOR_COORD_JOB"); 779 q.setParameter("jobId", jobId); 780 /* 781 * if (locking) { 782 * q.setHint("openjpa.FetchPlan.ReadLockMode", 783 * "READ"); OpenJPAQuery oq = 784 * OpenJPAPersistence.cast(q); FetchPlan fetch = 785 * oq.getFetchPlan(); 786 * fetch.setReadLockMode(LockModeType.WRITE); 787 * fetch.setLockTimeout(-1); // no limit } 788 */ 789 actions = q.getResultList(); 790 return actions; 791 } 792 catch (IllegalStateException e) { 793 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 794 } 795 } 796 }); 797 return actions; 798 } 799 800 public List<CoordinatorActionBean> getRunningActionsOlderThan(final long checkAgeSecs, final boolean locking) 801 throws StoreException { 802 List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan", 803 new Callable<List<CoordinatorActionBean>>() { 804 @SuppressWarnings("unchecked") 805 public List<CoordinatorActionBean> call() throws StoreException { 806 List<CoordinatorActionBean> actions; 807 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000); 808 try { 809 Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_OLDER_THAN"); 810 q.setParameter("lastModifiedTime", ts); 811 /* 812 * if (locking) { OpenJPAQuery oq = 813 * OpenJPAPersistence.cast(q); FetchPlan fetch = 814 * oq.getFetchPlan(); 815 * fetch.setReadLockMode(LockModeType.WRITE); 816 * fetch.setLockTimeout(-1); // no limit } 817 */ 818 actions = q.getResultList(); 819 return actions; 820 } 821 catch (IllegalStateException e) { 822 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 823 } 824 } 825 }); 826 return actions; 827 } 828 829 public List<CoordinatorActionBean> getRecoveryActionsOlderThan(final long checkAgeSecs, final boolean locking) 830 throws StoreException { 831 List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan", 832 new Callable<List<CoordinatorActionBean>>() { 833 @SuppressWarnings("unchecked") 834 public List<CoordinatorActionBean> call() throws StoreException { 835 List<CoordinatorActionBean> actions; 836 try { 837 Query q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN"); 838 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000); 839 q.setParameter("lastModifiedTime", ts); 840 /* 841 * if (locking) { OpenJPAQuery oq = 842 * OpenJPAPersistence.cast(q); FetchPlan fetch = 843 * oq.getFetchPlan(); 844 * fetch.setReadLockMode(LockModeType.WRITE); 845 * fetch.setLockTimeout(-1); // no limit } 846 */ 847 actions = q.getResultList(); 848 return actions; 849 } 850 catch (IllegalStateException e) { 851 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 852 } 853 } 854 }); 855 return actions; 856 } 857 858 /** 859 * Get coordinator action beans for given start date and end date 860 * 861 * @param startDate 862 * @param endDate 863 * @return list of coordinator action beans 864 * @throws StoreException 865 */ 866 public List<CoordinatorActionBean> getCoordActionsForDates(final String jobId, final Date startDate, 867 final Date endDate) 868 throws StoreException { 869 List<CoordinatorActionBean> actions = doOperation("getCoordActionsForDates", 870 new Callable<List<CoordinatorActionBean>>() { 871 @SuppressWarnings("unchecked") 872 public List<CoordinatorActionBean> call() throws StoreException { 873 List<CoordinatorActionBean> actions; 874 try { 875 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_DATES"); 876 q.setParameter("jobId", jobId); 877 q.setParameter("startTime", new Timestamp(startDate.getTime())); 878 q.setParameter("endTime", new Timestamp(endDate.getTime())); 879 actions = q.getResultList(); 880 881 List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>(); 882 for (CoordinatorActionBean a : actions) { 883 CoordinatorActionBean aa = getBeanForRunningCoordAction(a); 884 actionList.add(aa); 885 } 886 return actionList; 887 } 888 catch (IllegalStateException e) { 889 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 890 } 891 } 892 }); 893 return actions; 894 } 895 896 /** 897 * Get coordinator action bean for given date 898 * 899 * @param nominalTime 900 * @return CoordinatorActionBean 901 * @throws StoreException 902 */ 903 public CoordinatorActionBean getCoordActionForNominalTime(final String jobId, final Date nominalTime) 904 throws StoreException { 905 CoordinatorActionBean action = doOperation("getCoordActionForNominalTime", 906 new Callable<CoordinatorActionBean>() { 907 @SuppressWarnings("unchecked") 908 public CoordinatorActionBean call() throws StoreException { 909 List<CoordinatorActionBean> actions; 910 Query q = entityManager.createNamedQuery("GET_ACTION_FOR_NOMINALTIME"); 911 q.setParameter("jobId", jobId); 912 q.setParameter("nominalTime", new Timestamp(nominalTime.getTime())); 913 actions = q.getResultList(); 914 915 CoordinatorActionBean action = null; 916 if (actions.size() > 0) { 917 action = actions.get(0); 918 } 919 else { 920 throw new StoreException(ErrorCode.E0605, DateUtils.formatDateOozieTZ(nominalTime)); 921 } 922 return getBeanForRunningCoordAction(action); 923 } 924 }); 925 return action; 926 } 927 928 public List<String> getRecoveryActionsGroupByJobId(final long checkAgeSecs) throws StoreException { 929 List<String> jobids = doOperation("getRecoveryActionsGroupByJobId", new Callable<List<String>>() { 930 @SuppressWarnings("unchecked") 931 public List<String> call() throws StoreException { 932 List<String> jobids = new ArrayList<String>(); 933 try { 934 Query q = entityManager.createNamedQuery("GET_READY_ACTIONS_GROUP_BY_JOBID"); 935 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000); 936 q.setParameter(1, ts); 937 List<Object[]> list = q.getResultList(); 938 939 for (Object[] arr : list) { 940 if (arr != null && arr[0] != null) { 941 jobids.add((String) arr[0]); 942 } 943 } 944 945 return jobids; 946 } 947 catch (IllegalStateException e) { 948 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 949 } 950 } 951 }); 952 return jobids; 953 } 954 }