001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.store; 019 020 import java.sql.SQLException; 021 import java.sql.Timestamp; 022 import java.util.ArrayList; 023 import java.util.Date; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.concurrent.Callable; 027 028 import javax.persistence.EntityManager; 029 import javax.persistence.Query; 030 031 import org.apache.oozie.CoordinatorActionBean; 032 import org.apache.oozie.CoordinatorJobBean; 033 import org.apache.oozie.CoordinatorJobInfo; 034 import org.apache.oozie.ErrorCode; 035 import org.apache.oozie.client.Job.Status; 036 import org.apache.oozie.client.CoordinatorJob.Timeunit; 037 import org.apache.oozie.service.InstrumentationService; 038 import org.apache.oozie.service.Services; 039 import org.apache.oozie.util.DateUtils; 040 import org.apache.oozie.util.Instrumentation; 041 import org.apache.oozie.util.ParamChecker; 042 import org.apache.oozie.util.XLog; 043 import org.apache.oozie.workflow.WorkflowException; 044 import org.apache.openjpa.persistence.OpenJPAPersistence; 045 import org.apache.openjpa.persistence.OpenJPAQuery; 046 import org.apache.openjpa.persistence.jdbc.FetchDirection; 047 import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan; 048 import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm; 049 import org.apache.openjpa.persistence.jdbc.ResultSetType; 050 051 /** 052 * DB Implementation of Coord Store 053 */ 054 public class CoordinatorStore extends Store { 055 private final XLog log = XLog.getLog(getClass()); 056 057 private EntityManager entityManager; 058 private static final String INSTR_GROUP = "db"; 059 public static final int LOCK_TIMEOUT = 50000; 060 private static final long DAY_IN_MS = 24 * 60 * 60 * 1000; 061 062 public CoordinatorStore(boolean selectForUpdate) throws StoreException { 063 super(); 064 entityManager = getEntityManager(); 065 } 066 067 public CoordinatorStore(Store store, boolean selectForUpdate) throws StoreException { 068 super(store); 069 entityManager = getEntityManager(); 070 } 071 072 /** 073 * Create a CoordJobBean. It also creates the process instance for the job. 074 * 075 * @param workflow workflow bean 076 * @throws StoreException 077 */ 078 079 public void insertCoordinatorJob(final CoordinatorJobBean coordinatorJob) throws StoreException { 080 ParamChecker.notNull(coordinatorJob, "coordinatorJob"); 081 082 doOperation("insertCoordinatorJob", new Callable<Void>() { 083 public Void call() throws StoreException { 084 entityManager.persist(coordinatorJob); 085 return null; 086 } 087 }); 088 } 089 090 /** 091 * Load the CoordinatorJob into a Bean and return it. Also load the Workflow Instance into the bean. And lock the 092 * Workflow depending on the locking parameter. 093 * 094 * @param id Job ID 095 * @param locking Flag for Table Lock 096 * @return CoordinatorJobBean 097 * @throws StoreException 098 */ 099 public CoordinatorJobBean getCoordinatorJob(final String id, final boolean locking) throws StoreException { 100 ParamChecker.notEmpty(id, "CoordJobId"); 101 CoordinatorJobBean cjBean = doOperation("getCoordinatorJob", new Callable<CoordinatorJobBean>() { 102 @SuppressWarnings("unchecked") 103 public CoordinatorJobBean call() throws StoreException { 104 Query q = entityManager.createNamedQuery("GET_COORD_JOB"); 105 q.setParameter("id", id); 106 /* 107 * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q); 108 * // q.setHint("openjpa.FetchPlan.ReadLockMode","WRITE"); 109 * FetchPlan fetch = oq.getFetchPlan(); 110 * fetch.setReadLockMode(LockModeType.WRITE); 111 * fetch.setLockTimeout(-1); // 1 second } 112 */ 113 List<CoordinatorJobBean> cjBeans = q.getResultList(); 114 115 if (cjBeans.size() > 0) { 116 return cjBeans.get(0); 117 } 118 else { 119 throw new StoreException(ErrorCode.E0604, id); 120 } 121 } 122 }); 123 124 cjBean.setStatus(cjBean.getStatus()); 125 return cjBean; 126 } 127 128 /** 129 * Get a list of Coordinator Jobs that should be materialized. Jobs with a 'last materialized time' older than the 130 * argument will be returned. 131 * 132 * @param d Date 133 * @return List of Coordinator Jobs that have a last materialized time older than input date 134 * @throws StoreException 135 */ 136 public List<CoordinatorJobBean> getCoordinatorJobsToBeMaterialized(final Date d, final int limit) 137 throws StoreException { 138 139 ParamChecker.notNull(d, "Coord Job Materialization Date"); 140 List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsToBeMaterialized", 141 new Callable<List<CoordinatorJobBean>>() { 142 public List<CoordinatorJobBean> call() throws StoreException { 143 144 List<CoordinatorJobBean> cjBeans; 145 List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>(); 146 try { 147 Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN"); 148 q.setParameter("matTime", new Timestamp(d.getTime())); 149 if (limit > 0) { 150 q.setMaxResults(limit); 151 } 152 /* 153 OpenJPAQuery oq = OpenJPAPersistence.cast(q); 154 FetchPlan fetch = oq.getFetchPlan(); 155 fetch.setReadLockMode(LockModeType.WRITE); 156 fetch.setLockTimeout(-1); // no limit 157 */ 158 cjBeans = q.getResultList(); 159 // copy results to a new object 160 for (CoordinatorJobBean j : cjBeans) { 161 jobList.add(j); 162 } 163 } 164 catch (IllegalStateException e) { 165 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 166 } 167 return jobList; 168 169 } 170 }); 171 return cjBeans; 172 } 173 174 /** 175 * A list of Coordinator Jobs that are matched with the status and have last materialized time' older than 176 * checkAgeSecs will be returned. 177 * 178 * @param checkAgeSecs Job age in Seconds 179 * @param status Coordinator Job Status 180 * @param limit Number of results to return 181 * @param locking Flag for Table Lock 182 * @return List of Coordinator Jobs that are matched with the parameters. 183 * @throws StoreException 184 */ 185 public List<CoordinatorJobBean> getCoordinatorJobsOlderThanStatus(final long checkAgeSecs, final String status, 186 final int limit, final boolean locking) throws StoreException { 187 188 ParamChecker.notNull(status, "Coord Job Status"); 189 List<CoordinatorJobBean> cjBeans = doOperation("getCoordinatorJobsOlderThanStatus", 190 new Callable<List<CoordinatorJobBean>>() { 191 public List<CoordinatorJobBean> call() throws StoreException { 192 193 List<CoordinatorJobBean> cjBeans; 194 List<CoordinatorJobBean> jobList = new ArrayList<CoordinatorJobBean>(); 195 try { 196 Query q = entityManager.createNamedQuery("GET_COORD_JOBS_OLDER_THAN_STATUS"); 197 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000); 198 q.setParameter("lastModTime", ts); 199 q.setParameter("status", status); 200 if (limit > 0) { 201 q.setMaxResults(limit); 202 } 203 /* 204 * if (locking) { OpenJPAQuery oq = 205 * OpenJPAPersistence.cast(q); FetchPlan fetch = 206 * oq.getFetchPlan(); 207 * fetch.setReadLockMode(LockModeType.WRITE); 208 * fetch.setLockTimeout(-1); // no limit } 209 */ 210 cjBeans = q.getResultList(); 211 for (CoordinatorJobBean j : cjBeans) { 212 jobList.add(j); 213 } 214 } 215 catch (Exception e) { 216 throw new StoreException(ErrorCode.E0603, e.getMessage(), e); 217 } 218 return jobList; 219 220 } 221 }); 222 return cjBeans; 223 } 224 225 /** 226 * Load the CoordinatorAction into a Bean and return it. 227 * 228 * @param id action ID 229 * @return CoordinatorActionBean 230 * @throws StoreException 231 */ 232 public CoordinatorActionBean getCoordinatorAction(final String id, final boolean locking) throws StoreException { 233 ParamChecker.notEmpty(id, "actionID"); 234 CoordinatorActionBean caBean = doOperation("getCoordinatorAction", new Callable<CoordinatorActionBean>() { 235 public CoordinatorActionBean call() throws StoreException { 236 Query q = entityManager.createNamedQuery("GET_COORD_ACTION"); 237 q.setParameter("id", id); 238 OpenJPAQuery oq = OpenJPAPersistence.cast(q); 239 /* 240 * if (locking) { //q.setHint("openjpa.FetchPlan.ReadLockMode", 241 * "WRITE"); FetchPlan fetch = oq.getFetchPlan(); 242 * fetch.setReadLockMode(LockModeType.WRITE); 243 * fetch.setLockTimeout(-1); // no limit } 244 */ 245 246 CoordinatorActionBean action = null; 247 List<CoordinatorActionBean> actions = q.getResultList(); 248 if (actions.size() > 0) { 249 action = actions.get(0); 250 } 251 else { 252 throw new StoreException(ErrorCode.E0605, id); 253 } 254 255 /* 256 * if (locking) return action; else 257 */ 258 return getBeanForRunningCoordAction(action); 259 } 260 }); 261 return caBean; 262 } 263 264 /** 265 * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <= 266 * concurrency number. Sort returned actions based on execution order (FIFO, LIFO, LAST_ONLY) 267 * 268 * @param id job ID 269 * @param numResults number of results to return 270 * @param executionOrder execution for this job - FIFO, LIFO, LAST_ONLY 271 * @return List of CoordinatorActionBean 272 * @throws StoreException 273 */ 274 public List<CoordinatorActionBean> getCoordinatorActionsForJob(final String id, final int numResults, 275 final String executionOrder) throws StoreException { 276 ParamChecker.notEmpty(id, "jobID"); 277 List<CoordinatorActionBean> caBeans = doOperation("getCoordinatorActionsForJob", 278 new Callable<List<CoordinatorActionBean>>() { 279 public List<CoordinatorActionBean> call() throws StoreException { 280 281 List<CoordinatorActionBean> caBeans; 282 Query q; 283 // check if executionOrder is FIFO, LIFO, or LAST_ONLY 284 if (executionOrder.equalsIgnoreCase("FIFO")) { 285 q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_FIFO"); 286 } 287 else { 288 q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_LIFO"); 289 } 290 q.setParameter("jobId", id); 291 // if executionOrder is LAST_ONLY, only retrieve first 292 // record in LIFO, 293 // otherwise, use numResults if it is positive. 294 if (executionOrder.equalsIgnoreCase("LAST_ONLY")) { 295 q.setMaxResults(1); 296 } 297 else { 298 if (numResults > 0) { 299 q.setMaxResults(numResults); 300 } 301 } 302 caBeans = q.getResultList(); 303 return caBeans; 304 } 305 }); 306 return caBeans; 307 } 308 309 /** 310 * Return CoordinatorActions for a jobID. Action should be in READY state. Number of returned actions should be <= 311 * concurrency number. 312 * 313 * @param id job ID 314 * @return Number of running actions 315 * @throws StoreException 316 */ 317 public int getCoordinatorRunningActionsCount(final String id) throws StoreException { 318 ParamChecker.notEmpty(id, "jobID"); 319 Integer cnt = doOperation("getCoordinatorRunningActionsCount", new Callable<Integer>() { 320 public Integer call() throws SQLException { 321 322 Query q = entityManager.createNamedQuery("GET_COORD_RUNNING_ACTIONS_COUNT"); 323 324 q.setParameter("jobId", id); 325 Long count = (Long) q.getSingleResult(); 326 return Integer.valueOf(count.intValue()); 327 } 328 }); 329 return cnt.intValue(); 330 } 331 332 /** 333 * Create a new Action record in the ACTIONS table with the given Bean. 334 * 335 * @param action WorkflowActionBean 336 * @throws StoreException If the action is already present 337 */ 338 public void insertCoordinatorAction(final CoordinatorActionBean action) throws StoreException { 339 ParamChecker.notNull(action, "CoordinatorActionBean"); 340 doOperation("insertCoordinatorAction", new Callable<Void>() { 341 public Void call() throws StoreException { 342 entityManager.persist(action); 343 return null; 344 } 345 }); 346 } 347 348 /** 349 * Update the given action bean to DB. 350 * 351 * @param action Action Bean 352 * @throws StoreException if action doesn't exist 353 */ 354 public void updateCoordinatorAction(final CoordinatorActionBean action) throws StoreException { 355 ParamChecker.notNull(action, "CoordinatorActionBean"); 356 doOperation("updateCoordinatorAction", new Callable<Void>() { 357 public Void call() throws StoreException { 358 Query q = entityManager.createNamedQuery("UPDATE_COORD_ACTION"); 359 q.setParameter("id", action.getId()); 360 setActionQueryParameters(action, q); 361 q.executeUpdate(); 362 return null; 363 } 364 }); 365 } 366 367 /** 368 * Update the given action bean to DB. 369 * 370 * @param action Action Bean 371 * @throws StoreException if action doesn't exist 372 */ 373 public void updateCoordActionMin(final CoordinatorActionBean action) throws StoreException { 374 ParamChecker.notNull(action, "CoordinatorActionBean"); 375 doOperation("updateCoordinatorAction", new Callable<Void>() { 376 public Void call() throws StoreException { 377 Query q = entityManager.createNamedQuery("UPDATE_COORD_ACTION_MIN"); 378 q.setParameter("id", action.getId()); 379 q.setParameter("missingDependencies", action.getMissingDependencies()); 380 q.setParameter("lastModifiedTime", new Date()); 381 q.setParameter("status", action.getStatus().toString()); 382 q.setParameter("actionXml", action.getActionXml()); 383 q.executeUpdate(); 384 return null; 385 } 386 }); 387 } 388 389 /** 390 * Update the given coordinator job bean to DB. 391 * 392 * @param jobbean Coordinator Job Bean 393 * @throws StoreException if action doesn't exist 394 */ 395 public void updateCoordinatorJob(final CoordinatorJobBean job) throws StoreException { 396 ParamChecker.notNull(job, "CoordinatorJobBean"); 397 doOperation("updateJob", new Callable<Void>() { 398 public Void call() throws StoreException { 399 Query q = entityManager.createNamedQuery("UPDATE_COORD_JOB"); 400 q.setParameter("id", job.getId()); 401 setJobQueryParameters(job, q); 402 q.executeUpdate(); 403 return null; 404 } 405 }); 406 } 407 408 public void updateCoordinatorJobStatus(final CoordinatorJobBean job) throws StoreException { 409 ParamChecker.notNull(job, "CoordinatorJobBean"); 410 doOperation("updateJobStatus", new Callable<Void>() { 411 public Void call() throws StoreException { 412 Query q = entityManager.createNamedQuery("UPDATE_COORD_JOB_STATUS"); 413 q.setParameter("id", job.getId()); 414 q.setParameter("status", job.getStatus().toString()); 415 q.setParameter("lastModifiedTime", new Date()); 416 q.executeUpdate(); 417 return null; 418 } 419 }); 420 } 421 422 private <V> V doOperation(String name, Callable<V> command) throws StoreException { 423 try { 424 Instrumentation.Cron cron = new Instrumentation.Cron(); 425 cron.start(); 426 V retVal; 427 try { 428 retVal = command.call(); 429 } 430 finally { 431 cron.stop(); 432 } 433 Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron); 434 return retVal; 435 } 436 catch (StoreException ex) { 437 throw ex; 438 } 439 catch (SQLException ex) { 440 throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex); 441 } 442 catch (Exception e) { 443 throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e); 444 } 445 } 446 447 private void setJobQueryParameters(CoordinatorJobBean jBean, Query q) { 448 q.setParameter("appName", jBean.getAppName()); 449 q.setParameter("appPath", jBean.getAppPath()); 450 q.setParameter("concurrency", jBean.getConcurrency()); 451 q.setParameter("conf", jBean.getConf()); 452 q.setParameter("externalId", jBean.getExternalId()); 453 q.setParameter("frequency", jBean.getFrequency()); 454 q.setParameter("lastActionNumber", jBean.getLastActionNumber()); 455 q.setParameter("timeOut", jBean.getTimeout()); 456 q.setParameter("timeZone", jBean.getTimeZone()); 457 q.setParameter("authToken", jBean.getAuthToken()); 458 q.setParameter("createdTime", jBean.getCreatedTimestamp()); 459 q.setParameter("endTime", jBean.getEndTimestamp()); 460 q.setParameter("execution", jBean.getExecution()); 461 q.setParameter("jobXml", jBean.getJobXml()); 462 q.setParameter("lastAction", jBean.getLastActionTimestamp()); 463 q.setParameter("lastModifiedTime", new Date()); 464 q.setParameter("nextMaterializedTime", jBean.getNextMaterializedTimestamp()); 465 q.setParameter("origJobXml", jBean.getOrigJobXml()); 466 q.setParameter("slaXml", jBean.getSlaXml()); 467 q.setParameter("startTime", jBean.getStartTimestamp()); 468 q.setParameter("status", jBean.getStatus().toString()); 469 q.setParameter("timeUnit", jBean.getTimeUnitStr()); 470 } 471 472 private void setActionQueryParameters(CoordinatorActionBean aBean, Query q) { 473 q.setParameter("actionNumber", aBean.getActionNumber()); 474 q.setParameter("actionXml", aBean.getActionXml()); 475 q.setParameter("consoleUrl", aBean.getConsoleUrl()); 476 q.setParameter("createdConf", aBean.getCreatedConf()); 477 q.setParameter("errorCode", aBean.getErrorCode()); 478 q.setParameter("errorMessage", aBean.getErrorMessage()); 479 q.setParameter("externalStatus", aBean.getExternalStatus()); 480 q.setParameter("missingDependencies", aBean.getMissingDependencies()); 481 q.setParameter("runConf", aBean.getRunConf()); 482 q.setParameter("timeOut", aBean.getTimeOut()); 483 q.setParameter("trackerUri", aBean.getTrackerUri()); 484 q.setParameter("type", aBean.getType()); 485 q.setParameter("createdTime", aBean.getCreatedTimestamp()); 486 q.setParameter("externalId", aBean.getExternalId()); 487 q.setParameter("jobId", aBean.getJobId()); 488 q.setParameter("lastModifiedTime", new Date()); 489 q.setParameter("nominalTime", aBean.getNominalTimestamp()); 490 q.setParameter("slaXml", aBean.getSlaXml()); 491 q.setParameter("status", aBean.getStatus().toString()); 492 } 493 494 495 /** 496 * Purge the coordinators completed older than given days. 497 * 498 * @param olderThanDays number of days for which to preserve the coordinators 499 * @param limit maximum number of coordinator jobs to be purged 500 * @throws StoreException 501 */ 502 public void purge(final long olderThanDays, final int limit) throws StoreException { 503 doOperation("coord-purge", new Callable<Void>() { 504 public Void call() throws SQLException, StoreException, WorkflowException { 505 Timestamp lastModTm = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS)); 506 Query jobQ = entityManager.createNamedQuery("GET_COMPLETED_COORD_JOBS_OLDER_THAN_STATUS"); 507 jobQ.setParameter("lastModTime", lastModTm); 508 jobQ.setMaxResults(limit); 509 List<CoordinatorJobBean> coordJobs = jobQ.getResultList(); 510 511 int actionDeleted = 0; 512 if (coordJobs.size() != 0) { 513 for (CoordinatorJobBean coord : coordJobs) { 514 String jobId = coord.getId(); 515 entityManager.remove(coord); 516 Query g = entityManager.createNamedQuery("DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR"); 517 g.setParameter("jobId", jobId); 518 actionDeleted += g.executeUpdate(); 519 } 520 } 521 522 XLog.getLog(getClass()).debug("ENDED Coord Purge deleted jobs :" + coordJobs.size() + " and actions " + actionDeleted); 523 return null; 524 } 525 }); 526 } 527 528 public void commit() throws StoreException { 529 } 530 531 public void close() throws StoreException { 532 } 533 534 public CoordinatorJobBean getCoordinatorJobs(String id) { 535 // TODO Auto-generated method stub 536 return null; 537 } 538 539 public CoordinatorJobInfo getCoordinatorInfo(final Map<String, List<String>> filter, final int start, final int len) 540 throws StoreException { 541 542 CoordinatorJobInfo coordJobInfo = doOperation("getCoordinatorJobInfo", new Callable<CoordinatorJobInfo>() { 543 public CoordinatorJobInfo call() throws SQLException, StoreException { 544 List<String> orArray = new ArrayList<String>(); 545 List<String> colArray = new ArrayList<String>(); 546 List<String> valArray = new ArrayList<String>(); 547 StringBuilder sb = new StringBuilder(""); 548 549 StoreStatusFilter.filter(filter, orArray, colArray, valArray, sb, StoreStatusFilter.coordSeletStr, 550 StoreStatusFilter.coordCountStr); 551 552 int realLen = 0; 553 554 Query q = null; 555 Query qTotal = null; 556 if (orArray.size() == 0) { 557 q = entityManager.createNamedQuery("GET_COORD_JOBS_COLUMNS"); 558 q.setFirstResult(start - 1); 559 q.setMaxResults(len); 560 qTotal = entityManager.createNamedQuery("GET_COORD_JOBS_COUNT"); 561 } 562 else { 563 StringBuilder sbTotal = new StringBuilder(sb); 564 sb.append(" order by w.createdTimestamp desc "); 565 XLog.getLog(getClass()).debug("Created String is **** " + sb.toString()); 566 q = entityManager.createQuery(sb.toString()); 567 q.setFirstResult(start - 1); 568 q.setMaxResults(len); 569 qTotal = entityManager.createQuery(sbTotal.toString().replace(StoreStatusFilter.coordSeletStr, 570 StoreStatusFilter.coordCountStr)); 571 } 572 573 for (int i = 0; i < orArray.size(); i++) { 574 q.setParameter(colArray.get(i), valArray.get(i)); 575 qTotal.setParameter(colArray.get(i), valArray.get(i)); 576 } 577 578 OpenJPAQuery kq = OpenJPAPersistence.cast(q); 579 JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan(); 580 fetch.setFetchBatchSize(20); 581 fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE); 582 fetch.setFetchDirection(FetchDirection.FORWARD); 583 fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST); 584 List<?> resultList = q.getResultList(); 585 List<Object[]> objectArrList = (List<Object[]>) resultList; 586 List<CoordinatorJobBean> coordBeansList = new ArrayList<CoordinatorJobBean>(); 587 588 for (Object[] arr : objectArrList) { 589 CoordinatorJobBean ww = getBeanForCoordinatorJobFromArray(arr); 590 coordBeansList.add(ww); 591 } 592 593 realLen = ((Long) qTotal.getSingleResult()).intValue(); 594 595 return new CoordinatorJobInfo(coordBeansList, start, len, realLen); 596 } 597 }); 598 return coordJobInfo; 599 } 600 601 private CoordinatorJobBean getBeanForCoordinatorJobFromArray(Object[] arr) { 602 CoordinatorJobBean bean = new CoordinatorJobBean(); 603 bean.setId((String) arr[0]); 604 if (arr[1] != null) { 605 bean.setAppName((String) arr[1]); 606 } 607 if (arr[2] != null) { 608 bean.setStatus(Status.valueOf((String) arr[2])); 609 } 610 if (arr[3] != null) { 611 bean.setUser((String) arr[3]); 612 } 613 if (arr[4] != null) { 614 bean.setGroup((String) arr[4]); 615 } 616 if (arr[5] != null) { 617 bean.setStartTime((Timestamp) arr[5]); 618 } 619 if (arr[6] != null) { 620 bean.setEndTime((Timestamp) arr[6]); 621 } 622 if (arr[7] != null) { 623 bean.setAppPath((String) arr[7]); 624 } 625 if (arr[8] != null) { 626 bean.setConcurrency(((Integer) arr[8]).intValue()); 627 } 628 if (arr[9] != null) { 629 bean.setFrequency(((Integer) arr[9]).intValue()); 630 } 631 if (arr[10] != null) { 632 bean.setLastActionTime((Timestamp) arr[10]); 633 } 634 if (arr[11] != null) { 635 bean.setNextMaterializedTime((Timestamp) arr[11]); 636 } 637 if (arr[13] != null) { 638 bean.setTimeUnit(Timeunit.valueOf((String) arr[13])); 639 } 640 if (arr[14] != null) { 641 bean.setTimeZone((String) arr[14]); 642 } 643 if (arr[15] != null) { 644 bean.setTimeout((Integer) arr[15]); 645 } 646 return bean; 647 } 648 649 /** 650 * Loads all actions for the given Coordinator job. 651 * 652 * @param jobId coordinator job id 653 * @param locking true if Actions are to be locked 654 * @return A List of CoordinatorActionBean 655 * @throws StoreException 656 */ 657 public Integer getActionsForCoordinatorJob(final String jobId, final boolean locking) 658 throws StoreException { 659 ParamChecker.notEmpty(jobId, "CoordinatorJobID"); 660 Integer actionsCount = doOperation("getActionsForCoordinatorJob", 661 new Callable<Integer>() { 662 @SuppressWarnings("unchecked") 663 public Integer call() throws StoreException { 664 List<CoordinatorActionBean> actions; 665 List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>(); 666 try { 667 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB"); 668 q.setParameter("jobId", jobId); 669 /* 670 * if (locking) { // 671 * q.setHint("openjpa.FetchPlan.ReadLockMode", // 672 * "READ"); OpenJPAQuery oq = 673 * OpenJPAPersistence.cast(q); JDBCFetchPlan fetch = 674 * (JDBCFetchPlan) oq.getFetchPlan(); 675 * fetch.setReadLockMode(LockModeType.WRITE); 676 * fetch.setLockTimeout(-1); // 1 second } 677 */ 678 Long count = (Long) q.getSingleResult(); 679 return Integer.valueOf(count.intValue()); 680 /*actions = q.getResultList(); 681 for (CoordinatorActionBean a : actions) { 682 CoordinatorActionBean aa = getBeanForRunningCoordAction(a); 683 actionList.add(aa); 684 }*/ 685 } 686 catch (IllegalStateException e) { 687 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 688 } 689 /* 690 * if (locking) { return actions; } else { 691 */ 692 693 // } 694 } 695 }); 696 return actionsCount; 697 } 698 699 /** 700 * Loads given number of actions for the given Coordinator job. 701 * 702 * @param jobId coordinator job id 703 * @param start offset for select statement 704 * @param len number of Workflow Actions to be returned 705 * @return A List of CoordinatorActionBean 706 * @throws StoreException 707 */ 708 public List<CoordinatorActionBean> getActionsSubsetForCoordinatorJob(final String jobId, final int start, 709 final int len) throws StoreException { 710 ParamChecker.notEmpty(jobId, "CoordinatorJobID"); 711 List<CoordinatorActionBean> actions = doOperation("getActionsForCoordinatorJob", 712 new Callable<List<CoordinatorActionBean>>() { 713 @SuppressWarnings("unchecked") 714 public List<CoordinatorActionBean> call() throws StoreException { 715 List<CoordinatorActionBean> actions; 716 List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>(); 717 try { 718 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB"); 719 q.setParameter("jobId", jobId); 720 q.setFirstResult(start - 1); 721 q.setMaxResults(len); 722 actions = q.getResultList(); 723 for (CoordinatorActionBean a : actions) { 724 CoordinatorActionBean aa = getBeanForRunningCoordAction(a); 725 actionList.add(aa); 726 } 727 } 728 catch (IllegalStateException e) { 729 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 730 } 731 return actionList; 732 } 733 }); 734 return actions; 735 } 736 737 protected CoordinatorActionBean getBeanForRunningCoordAction(CoordinatorActionBean a) { 738 if (a != null) { 739 CoordinatorActionBean action = new CoordinatorActionBean(); 740 action.setId(a.getId()); 741 action.setActionNumber(a.getActionNumber()); 742 action.setActionXml(a.getActionXml()); 743 action.setConsoleUrl(a.getConsoleUrl()); 744 action.setCreatedConf(a.getCreatedConf()); 745 //action.setErrorCode(a.getErrorCode()); 746 //action.setErrorMessage(a.getErrorMessage()); 747 action.setExternalStatus(a.getExternalStatus()); 748 action.setMissingDependencies(a.getMissingDependencies()); 749 action.setRunConf(a.getRunConf()); 750 action.setTimeOut(a.getTimeOut()); 751 action.setTrackerUri(a.getTrackerUri()); 752 action.setType(a.getType()); 753 action.setCreatedTime(a.getCreatedTime()); 754 action.setExternalId(a.getExternalId()); 755 action.setJobId(a.getJobId()); 756 action.setLastModifiedTime(a.getLastModifiedTime()); 757 action.setNominalTime(a.getNominalTime()); 758 action.setSlaXml(a.getSlaXml()); 759 action.setStatus(a.getStatus()); 760 return action; 761 } 762 return null; 763 } 764 765 public CoordinatorActionBean getAction(String id, boolean b) { 766 return null; 767 } 768 769 770 public List<CoordinatorActionBean> getRunningActionsForCoordinatorJob(final String jobId, final boolean locking) 771 throws StoreException { 772 ParamChecker.notEmpty(jobId, "CoordinatorJobID"); 773 List<CoordinatorActionBean> actions = doOperation("getRunningActionsForCoordinatorJob", 774 new Callable<List<CoordinatorActionBean>>() { 775 @SuppressWarnings("unchecked") 776 public List<CoordinatorActionBean> call() throws StoreException { 777 List<CoordinatorActionBean> actions; 778 try { 779 Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_FOR_COORD_JOB"); 780 q.setParameter("jobId", jobId); 781 /* 782 * if (locking) { 783 * q.setHint("openjpa.FetchPlan.ReadLockMode", 784 * "READ"); OpenJPAQuery oq = 785 * OpenJPAPersistence.cast(q); FetchPlan fetch = 786 * oq.getFetchPlan(); 787 * fetch.setReadLockMode(LockModeType.WRITE); 788 * fetch.setLockTimeout(-1); // no limit } 789 */ 790 actions = q.getResultList(); 791 return actions; 792 } 793 catch (IllegalStateException e) { 794 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 795 } 796 } 797 }); 798 return actions; 799 } 800 801 public List<CoordinatorActionBean> getRunningActionsOlderThan(final long checkAgeSecs, final boolean locking) 802 throws StoreException { 803 List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan", 804 new Callable<List<CoordinatorActionBean>>() { 805 @SuppressWarnings("unchecked") 806 public List<CoordinatorActionBean> call() throws StoreException { 807 List<CoordinatorActionBean> actions; 808 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000); 809 try { 810 Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS_OLDER_THAN"); 811 q.setParameter("lastModifiedTime", ts); 812 /* 813 * if (locking) { OpenJPAQuery oq = 814 * OpenJPAPersistence.cast(q); FetchPlan fetch = 815 * oq.getFetchPlan(); 816 * fetch.setReadLockMode(LockModeType.WRITE); 817 * fetch.setLockTimeout(-1); // no limit } 818 */ 819 actions = q.getResultList(); 820 return actions; 821 } 822 catch (IllegalStateException e) { 823 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 824 } 825 } 826 }); 827 return actions; 828 } 829 830 public List<CoordinatorActionBean> getRecoveryActionsOlderThan(final long checkAgeSecs, final boolean locking) 831 throws StoreException { 832 List<CoordinatorActionBean> actions = doOperation("getRunningActionsOlderThan", 833 new Callable<List<CoordinatorActionBean>>() { 834 @SuppressWarnings("unchecked") 835 public List<CoordinatorActionBean> call() throws StoreException { 836 List<CoordinatorActionBean> actions; 837 try { 838 Query q = entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN"); 839 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000); 840 q.setParameter("lastModifiedTime", ts); 841 /* 842 * if (locking) { OpenJPAQuery oq = 843 * OpenJPAPersistence.cast(q); FetchPlan fetch = 844 * oq.getFetchPlan(); 845 * fetch.setReadLockMode(LockModeType.WRITE); 846 * fetch.setLockTimeout(-1); // no limit } 847 */ 848 actions = q.getResultList(); 849 return actions; 850 } 851 catch (IllegalStateException e) { 852 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 853 } 854 } 855 }); 856 return actions; 857 } 858 859 /** 860 * Get coordinator action beans for given start date and end date 861 * 862 * @param startDate 863 * @param endDate 864 * @return list of coordinator action beans 865 * @throws StoreException 866 */ 867 public List<CoordinatorActionBean> getCoordActionsForDates(final String jobId, final Date startDate, 868 final Date endDate) 869 throws StoreException { 870 List<CoordinatorActionBean> actions = doOperation("getCoordActionsForDates", 871 new Callable<List<CoordinatorActionBean>>() { 872 @SuppressWarnings("unchecked") 873 public List<CoordinatorActionBean> call() throws StoreException { 874 List<CoordinatorActionBean> actions; 875 try { 876 Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_DATES"); 877 q.setParameter("jobId", jobId); 878 q.setParameter("startTime", new Timestamp(startDate.getTime())); 879 q.setParameter("endTime", new Timestamp(endDate.getTime())); 880 actions = q.getResultList(); 881 882 List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>(); 883 for (CoordinatorActionBean a : actions) { 884 CoordinatorActionBean aa = getBeanForRunningCoordAction(a); 885 actionList.add(aa); 886 } 887 return actionList; 888 } 889 catch (IllegalStateException e) { 890 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 891 } 892 } 893 }); 894 return actions; 895 } 896 897 /** 898 * Get coordinator action bean for given date 899 * 900 * @param nominalTime 901 * @return CoordinatorActionBean 902 * @throws StoreException 903 */ 904 public CoordinatorActionBean getCoordActionForNominalTime(final String jobId, final Date nominalTime) 905 throws StoreException { 906 CoordinatorActionBean action = doOperation("getCoordActionForNominalTime", 907 new Callable<CoordinatorActionBean>() { 908 @SuppressWarnings("unchecked") 909 public CoordinatorActionBean call() throws StoreException { 910 List<CoordinatorActionBean> actions; 911 Query q = entityManager.createNamedQuery("GET_ACTION_FOR_NOMINALTIME"); 912 q.setParameter("jobId", jobId); 913 q.setParameter("nominalTime", new Timestamp(nominalTime.getTime())); 914 actions = q.getResultList(); 915 916 CoordinatorActionBean action = null; 917 if (actions.size() > 0) { 918 action = actions.get(0); 919 } 920 else { 921 throw new StoreException(ErrorCode.E0605, DateUtils.formatDateOozieTZ(nominalTime)); 922 } 923 return getBeanForRunningCoordAction(action); 924 } 925 }); 926 return action; 927 } 928 929 public List<String> getRecoveryActionsGroupByJobId(final long checkAgeSecs) throws StoreException { 930 List<String> jobids = doOperation("getRecoveryActionsGroupByJobId", new Callable<List<String>>() { 931 @SuppressWarnings("unchecked") 932 public List<String> call() throws StoreException { 933 List<String> jobids = new ArrayList<String>(); 934 try { 935 Query q = entityManager.createNamedQuery("GET_READY_ACTIONS_GROUP_BY_JOBID"); 936 Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000); 937 q.setParameter(1, ts); 938 List<Object[]> list = q.getResultList(); 939 940 for (Object[] arr : list) { 941 if (arr != null && arr[0] != null) { 942 jobids.add((String) arr[0]); 943 } 944 } 945 946 return jobids; 947 } 948 catch (IllegalStateException e) { 949 throw new StoreException(ErrorCode.E0601, e.getMessage(), e); 950 } 951 } 952 }); 953 return jobids; 954 } 955 }