001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.coord; 019 020 import java.io.IOException; 021 import java.util.ArrayList; 022 import java.util.Calendar; 023 import java.util.Date; 024 import java.util.List; 025 import java.util.TimeZone; 026 027 import org.apache.hadoop.conf.Configuration; 028 import org.apache.hadoop.fs.Path; 029 030 import org.apache.oozie.client.OozieClient; 031 import org.apache.oozie.util.DateUtils; 032 import org.apache.oozie.util.ELEvaluator; 033 import org.apache.oozie.util.ParamChecker; 034 import org.apache.oozie.util.XLog; 035 import org.apache.oozie.service.HadoopAccessorException; 036 import org.apache.oozie.service.Services; 037 import org.apache.oozie.service.HadoopAccessorService; 038 039 /** 040 * This class implements the EL function related to coordinator 041 */ 042 043 public class CoordELFunctions { 044 final private static XLog LOG = XLog.getLog(CoordELFunctions.class); 045 final private static String DATASET = "oozie.coord.el.dataset.bean"; 046 final private static String COORD_ACTION = "oozie.coord.el.app.bean"; 047 final public static String CONFIGURATION = "oozie.coord.el.conf"; 048 final public static String LATEST_EL_USE_CURRENT_TIME = "oozie.service.ELService.latest-el.use-current-time"; 049 // INSTANCE_SEPARATOR is used to separate multiple directories into one tag. 050 final public static String INSTANCE_SEPARATOR = "#"; 051 final public static String DIR_SEPARATOR = ","; 052 // TODO: in next release, support flexibility 053 private static String END_OF_OPERATION_INDICATOR_FILE = "_SUCCESS"; 054 055 /** 056 * Used in defining the frequency in 'day' unit. <p/> domain: <code> val > 0</code> and should be integer. 057 * 058 * @param val frequency in number of days. 059 * @return number of days and also set the frequency timeunit to "day" 060 */ 061 public static int ph1_coord_days(int val) { 062 val = ParamChecker.checkGTZero(val, "n"); 063 ELEvaluator eval = ELEvaluator.getCurrent(); 064 eval.setVariable("timeunit", TimeUnit.DAY); 065 eval.setVariable("endOfDuration", TimeUnit.NONE); 066 return val; 067 } 068 069 /** 070 * Used in defining the frequency in 'month' unit. <p/> domain: <code> val > 0</code> and should be integer. 071 * 072 * @param val frequency in number of months. 073 * @return number of months and also set the frequency timeunit to "month" 074 */ 075 public static int ph1_coord_months(int val) { 076 val = ParamChecker.checkGTZero(val, "n"); 077 ELEvaluator eval = ELEvaluator.getCurrent(); 078 eval.setVariable("timeunit", TimeUnit.MONTH); 079 eval.setVariable("endOfDuration", TimeUnit.NONE); 080 return val; 081 } 082 083 /** 084 * Used in defining the frequency in 'hour' unit. <p/> parameter value domain: <code> val > 0</code> and should 085 * be integer. 086 * 087 * @param val frequency in number of hours. 088 * @return number of minutes and also set the frequency timeunit to "minute" 089 */ 090 public static int ph1_coord_hours(int val) { 091 val = ParamChecker.checkGTZero(val, "n"); 092 ELEvaluator eval = ELEvaluator.getCurrent(); 093 eval.setVariable("timeunit", TimeUnit.MINUTE); 094 eval.setVariable("endOfDuration", TimeUnit.NONE); 095 return val * 60; 096 } 097 098 /** 099 * Used in defining the frequency in 'minute' unit. <p/> domain: <code> val > 0</code> and should be integer. 100 * 101 * @param val frequency in number of minutes. 102 * @return number of minutes and also set the frequency timeunit to "minute" 103 */ 104 public static int ph1_coord_minutes(int val) { 105 val = ParamChecker.checkGTZero(val, "n"); 106 ELEvaluator eval = ELEvaluator.getCurrent(); 107 eval.setVariable("timeunit", TimeUnit.MINUTE); 108 eval.setVariable("endOfDuration", TimeUnit.NONE); 109 return val; 110 } 111 112 /** 113 * Used in defining the frequency in 'day' unit and specify the "end of day" property. <p/> Every instance will 114 * start at 00:00 hour of each day. <p/> domain: <code> val > 0</code> and should be integer. 115 * 116 * @param val frequency in number of days. 117 * @return number of days and also set the frequency timeunit to "day" and end_of_duration flag to "day" 118 */ 119 public static int ph1_coord_endOfDays(int val) { 120 val = ParamChecker.checkGTZero(val, "n"); 121 ELEvaluator eval = ELEvaluator.getCurrent(); 122 eval.setVariable("timeunit", TimeUnit.DAY); 123 eval.setVariable("endOfDuration", TimeUnit.END_OF_DAY); 124 return val; 125 } 126 127 /** 128 * Used in defining the frequency in 'month' unit and specify the "end of month" property. <p/> Every instance will 129 * start at first day of each month at 00:00 hour. <p/> domain: <code> val > 0</code> and should be integer. 130 * 131 * @param val: frequency in number of months. 132 * @return number of months and also set the frequency timeunit to "month" and end_of_duration flag to "month" 133 */ 134 public static int ph1_coord_endOfMonths(int val) { 135 val = ParamChecker.checkGTZero(val, "n"); 136 ELEvaluator eval = ELEvaluator.getCurrent(); 137 eval.setVariable("timeunit", TimeUnit.MONTH); 138 eval.setVariable("endOfDuration", TimeUnit.END_OF_MONTH); 139 return val; 140 } 141 142 /** 143 * Calculate the difference of timezone offset in minutes between dataset and coordinator job. <p/> Depends on: <p/> 144 * 1. Timezone of both dataset and job <p/> 2. Action creation Time 145 * 146 * @return difference in minutes (DataSet TZ Offset - Application TZ offset) 147 */ 148 public static int ph2_coord_tzOffset() { 149 Date actionCreationTime = getActionCreationtime(); 150 TimeZone dsTZ = ParamChecker.notNull(getDatasetTZ(), "DatasetTZ"); 151 TimeZone jobTZ = ParamChecker.notNull(getJobTZ(), "JobTZ"); 152 // Apply the TZ into Calendar object 153 Calendar dsTime = Calendar.getInstance(dsTZ); 154 dsTime.setTime(actionCreationTime); 155 Calendar jobTime = Calendar.getInstance(jobTZ); 156 jobTime.setTime(actionCreationTime); 157 return (dsTime.get(Calendar.ZONE_OFFSET) - jobTime.get(Calendar.ZONE_OFFSET)) / (1000 * 60); 158 } 159 160 public static int ph3_coord_tzOffset() { 161 return ph2_coord_tzOffset(); 162 } 163 164 /** 165 * Returns the a date string while given a base date in 'strBaseDate', 166 * offset and unit (e.g. DAY, MONTH, HOUR, MINUTE, MONTH). 167 * 168 * @param strBaseDate -- base date 169 * @param offset -- any number 170 * @param unit -- DAY, MONTH, HOUR, MINUTE, MONTH 171 * @return date string 172 * @throws Exception 173 */ 174 public static String ph2_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception { 175 Calendar baseCalDate = DateUtils.getCalendar(strBaseDate); 176 StringBuilder buffer = new StringBuilder(); 177 baseCalDate.add(TimeUnit.valueOf(unit).getCalendarUnit(), offset); 178 buffer.append(DateUtils.formatDateOozieTZ(baseCalDate)); 179 return buffer.toString(); 180 } 181 182 public static String ph3_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception { 183 return ph2_coord_dateOffset(strBaseDate, offset, unit); 184 } 185 186 /** 187 * Determine the date-time in Oozie processing timezone of n-th future available dataset instance 188 * from nominal Time but not beyond the instance specified as 'instance. 189 * <p/> 190 * It depends on: 191 * <p/> 192 * 1. Data set frequency 193 * <p/> 194 * 2. Data set Time unit (day, month, minute) 195 * <p/> 196 * 3. Data set Time zone/DST 197 * <p/> 198 * 4. End Day/Month flag 199 * <p/> 200 * 5. Data set initial instance 201 * <p/> 202 * 6. Action Creation Time 203 * <p/> 204 * 7. Existence of dataset's directory 205 * 206 * @param n :instance count 207 * <p/> 208 * domain: n >= 0, n is integer 209 * @param instance: How many future instance it should check? value should 210 * be >=0 211 * @return date-time in Oozie processing timezone of the n-th instance 212 * <p/> 213 * @throws Exception 214 */ 215 public static String ph3_coord_future(int n, int instance) throws Exception { 216 ParamChecker.checkGEZero(n, "future:n"); 217 ParamChecker.checkGTZero(instance, "future:instance"); 218 if (isSyncDataSet()) {// For Sync Dataset 219 return coord_future_sync(n, instance); 220 } 221 else { 222 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 223 } 224 } 225 226 /** 227 * Determine the date-time in Oozie processing timezone of the future available dataset instances 228 * from start to end offsets from nominal Time but not beyond the instance specified as 'instance'. 229 * <p/> 230 * It depends on: 231 * <p/> 232 * 1. Data set frequency 233 * <p/> 234 * 2. Data set Time unit (day, month, minute) 235 * <p/> 236 * 3. Data set Time zone/DST 237 * <p/> 238 * 4. End Day/Month flag 239 * <p/> 240 * 5. Data set initial instance 241 * <p/> 242 * 6. Action Creation Time 243 * <p/> 244 * 7. Existence of dataset's directory 245 * 246 * @param start : start instance offset 247 * <p/> 248 * domain: start >= 0, start is integer 249 * @param end : end instance offset 250 * <p/> 251 * domain: end >= 0, end is integer 252 * @param instance: How many future instance it should check? value should 253 * be >=0 254 * @return date-time in Oozie processing timezone of the instances from start to end offsets 255 * delimited by comma. 256 * <p/> 257 * @throws Exception 258 */ 259 public static String ph3_coord_futureRange(int start, int end, int instance) throws Exception { 260 ParamChecker.checkGEZero(start, "future:n"); 261 ParamChecker.checkGEZero(end, "future:n"); 262 ParamChecker.checkGTZero(instance, "future:instance"); 263 if (isSyncDataSet()) {// For Sync Dataset 264 return coord_futureRange_sync(start, end, instance); 265 } 266 else { 267 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 268 } 269 } 270 271 private static String coord_future_sync(int n, int instance) throws Exception { 272 return coord_futureRange_sync(n, n, instance); 273 } 274 275 private static String coord_futureRange_sync(int startOffset, int endOffset, int instance) throws Exception { 276 ELEvaluator eval = ELEvaluator.getCurrent(); 277 String retVal = ""; 278 int datasetFrequency = (int) getDSFrequency();// in minutes 279 TimeUnit dsTimeUnit = getDSTimeUnit(); 280 int[] instCount = new int[1]; 281 Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount); 282 StringBuilder resolvedInstances = new StringBuilder(); 283 StringBuilder resolvedURIPaths = new StringBuilder(); 284 if (nominalInstanceCal != null) { 285 Calendar initInstance = getInitialInstanceCal(); 286 nominalInstanceCal = (Calendar) initInstance.clone(); 287 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 288 289 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 290 if (ds == null) { 291 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 292 } 293 String uriTemplate = ds.getUriTemplate(); 294 Configuration conf = (Configuration) eval.getVariable(CONFIGURATION); 295 if (conf == null) { 296 throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION); 297 } 298 int available = 0, checkedInstance = 0; 299 boolean resolved = false; 300 String user = ParamChecker 301 .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME); 302 String doneFlag = ds.getDoneFlag(); 303 while (instance >= checkedInstance) { 304 ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal); 305 String uriPath = uriEval.evaluate(uriTemplate, String.class); 306 String pathWithDoneFlag = uriPath; 307 if (doneFlag.length() > 0) { 308 pathWithDoneFlag += "/" + doneFlag; 309 } 310 if (isPathAvailable(pathWithDoneFlag, user, null, conf)) { 311 LOG.debug("Found future(" + available + "): " + pathWithDoneFlag); 312 if (available == endOffset) { 313 LOG.debug("Matched future(" + available + "): " + pathWithDoneFlag); 314 resolved = true; 315 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)); 316 resolvedURIPaths.append(uriPath); 317 retVal = resolvedInstances.toString(); 318 eval.setVariable("resolved_path", resolvedURIPaths.toString()); 319 break; 320 } else if (available >= startOffset) { 321 LOG.debug("Matched future(" + available + "): " + pathWithDoneFlag); 322 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(INSTANCE_SEPARATOR); 323 resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR); 324 } 325 available++; 326 } 327 // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), 328 // -datasetFrequency); 329 nominalInstanceCal = (Calendar) initInstance.clone(); 330 instCount[0]++; 331 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 332 checkedInstance++; 333 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 334 } 335 if (!resolved) { 336 // return unchanged future function with variable 'is_resolved' 337 // to 'false' 338 eval.setVariable("is_resolved", Boolean.FALSE); 339 if (startOffset == endOffset) { 340 retVal = "${coord:future(" + startOffset + ", " + instance + ")}"; 341 } 342 else { 343 retVal = "${coord:futureRange(" + startOffset + ", " + endOffset + ", " + instance + ")}"; 344 } 345 } 346 else { 347 eval.setVariable("is_resolved", Boolean.TRUE); 348 } 349 } 350 else {// No feasible nominal time 351 eval.setVariable("is_resolved", Boolean.TRUE); 352 retVal = ""; 353 } 354 return retVal; 355 } 356 357 /** 358 * Return nominal time or Action Creation Time. 359 * <p/> 360 * 361 * @return coordinator action creation or materialization date time 362 * @throws Exception if unable to format the Date object to String 363 */ 364 public static String ph2_coord_nominalTime() throws Exception { 365 ELEvaluator eval = ELEvaluator.getCurrent(); 366 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 367 "Coordinator Action"); 368 return DateUtils.formatDateOozieTZ(action.getNominalTime()); 369 } 370 371 public static String ph3_coord_nominalTime() throws Exception { 372 return ph2_coord_nominalTime(); 373 } 374 375 /** 376 * Convert from standard date-time formatting to a desired format. 377 * <p/> 378 * @param dateTimeStr - A timestamp in standard (ISO8601) format. 379 * @param format - A string representing the desired format. 380 * @return coordinator action creation or materialization date time 381 * @throws Exception if unable to format the Date object to String 382 */ 383 public static String ph2_coord_formatTime(String dateTimeStr, String format) 384 throws Exception { 385 Date dateTime = DateUtils.parseDateOozieTZ(dateTimeStr); 386 return DateUtils.formatDateCustom(dateTime, format); 387 } 388 389 public static String ph3_coord_formatTime(String dateTimeStr, String format) 390 throws Exception { 391 return ph2_coord_formatTime(dateTimeStr, format); 392 } 393 394 /** 395 * Return Action Id. <p/> 396 * 397 * @return coordinator action Id 398 */ 399 public static String ph2_coord_actionId() throws Exception { 400 ELEvaluator eval = ELEvaluator.getCurrent(); 401 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 402 "Coordinator Action"); 403 return action.getActionId(); 404 } 405 406 public static String ph3_coord_actionId() throws Exception { 407 return ph2_coord_actionId(); 408 } 409 410 /** 411 * Return Job Name. <p/> 412 * 413 * @return coordinator name 414 */ 415 public static String ph2_coord_name() throws Exception { 416 ELEvaluator eval = ELEvaluator.getCurrent(); 417 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 418 "Coordinator Action"); 419 return action.getName(); 420 } 421 422 public static String ph3_coord_name() throws Exception { 423 return ph2_coord_name(); 424 } 425 426 /** 427 * Return Action Start time. <p/> 428 * 429 * @return coordinator action start time 430 * @throws Exception if unable to format the Date object to String 431 */ 432 public static String ph2_coord_actualTime() throws Exception { 433 ELEvaluator eval = ELEvaluator.getCurrent(); 434 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 435 if (coordAction == null) { 436 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 437 } 438 return DateUtils.formatDateOozieTZ(coordAction.getActualTime()); 439 } 440 441 public static String ph3_coord_actualTime() throws Exception { 442 return ph2_coord_actualTime(); 443 } 444 445 /** 446 * Used to specify a list of URI's that are used as input dir to the workflow job. <p/> Look for two evaluator-level 447 * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of 448 * URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something 449 * unresolved, this function will echo back the original function <p/> otherwise it sends the uris. 450 * 451 * @param dataInName : Datain name 452 * @return the list of URI's separated by INSTANCE_SEPARATOR <p/> if there are unresolved EL function (i.e. latest) 453 * , echo back <p/> the function without resolving the function. 454 */ 455 public static String ph3_coord_dataIn(String dataInName) { 456 String uris = ""; 457 ELEvaluator eval = ELEvaluator.getCurrent(); 458 uris = (String) eval.getVariable(".datain." + dataInName); 459 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); 460 if (unresolved != null && unresolved.booleanValue() == true) { 461 return "${coord:dataIn('" + dataInName + "')}"; 462 } 463 return uris; 464 } 465 466 /** 467 * Used to specify a list of URI's that are output dir of the workflow job. <p/> Look for one evaluator-level 468 * variable <p/> dataout.<DATAOUT_NAME> <p/> It defines the current list of URI. <p/> otherwise it sends the uris. 469 * 470 * @param dataOutName : Dataout name 471 * @return the list of URI's separated by INSTANCE_SEPARATOR 472 */ 473 public static String ph3_coord_dataOut(String dataOutName) { 474 String uris = ""; 475 ELEvaluator eval = ELEvaluator.getCurrent(); 476 uris = (String) eval.getVariable(".dataout." + dataOutName); 477 return uris; 478 } 479 480 /** 481 * Determine the date-time in Oozie processing timezone of n-th dataset instance. <p/> It depends on: <p/> 1. 482 * Data set frequency <p/> 2. 483 * Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. Data 484 * set initial instance <p/> 6. Action Creation Time 485 * 486 * @param n instance count domain: n is integer 487 * @return date-time in Oozie processing timezone of the n-th instance returns 'null' means n-th instance is 488 * earlier than Initial-Instance of DS 489 * @throws Exception 490 */ 491 public static String ph2_coord_current(int n) throws Exception { 492 if (isSyncDataSet()) { // For Sync Dataset 493 return coord_current_sync(n); 494 } 495 else { 496 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 497 } 498 } 499 500 /** 501 * Determine the date-time in Oozie processing timezone of the given offset from the dataset effective nominal time. <p/> It 502 * depends on: <p> 1. Data set frequency <p/> 2. Data set Time Unit <p/> 3. Data set Time zone/DST 503 * <p/> 4. Data set initial instance <p/> 5. Action Creation Time 504 * 505 * @param n offset amount (integer) 506 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR") 507 * @return date-time in Oozie processing timezone of the given offset from the dataset effective nominal time 508 * @throws Exception if there was a problem formatting 509 */ 510 public static String ph2_coord_offset(int n, String timeUnit) throws Exception { 511 if (isSyncDataSet()) { // For Sync Dataset 512 return coord_offset_sync(n, timeUnit); 513 } 514 else { 515 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 516 } 517 } 518 519 /** 520 * Determine how many hours is on the date of n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency 521 * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. 522 * Data set initial instance <p/> 6. Action Creation Time 523 * 524 * @param n instance count <p/> domain: n is integer 525 * @return number of hours on that day <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS 526 * @throws Exception 527 */ 528 public static int ph2_coord_hoursInDay(int n) throws Exception { 529 int datasetFrequency = (int) getDSFrequency(); 530 // /Calendar nominalInstanceCal = 531 // getCurrentInstance(getActionCreationtime()); 532 Calendar nominalInstanceCal = getEffectiveNominalTime(); 533 if (nominalInstanceCal == null) { 534 return -1; 535 } 536 nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n); 537 /* 538 * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) 539 * { return -1; } 540 */ 541 nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ 542 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 543 return DateUtils.hoursInDay(nominalInstanceCal); 544 } 545 546 public static int ph3_coord_hoursInDay(int n) throws Exception { 547 return ph2_coord_hoursInDay(n); 548 } 549 550 /** 551 * Calculate number of days in one month for n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency . 552 * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. 553 * Data set initial instance <p/> 6. Action Creation Time 554 * 555 * @param n instance count. domain: n is integer 556 * @return number of days in that month <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS 557 * @throws Exception 558 */ 559 public static int ph2_coord_daysInMonth(int n) throws Exception { 560 int datasetFrequency = (int) getDSFrequency();// in minutes 561 // Calendar nominalInstanceCal = 562 // getCurrentInstance(getActionCreationtime()); 563 Calendar nominalInstanceCal = getEffectiveNominalTime(); 564 if (nominalInstanceCal == null) { 565 return -1; 566 } 567 nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n); 568 /* 569 * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) 570 * { return -1; } 571 */ 572 nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ 573 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 574 return nominalInstanceCal.getActualMaximum(Calendar.DAY_OF_MONTH); 575 } 576 577 public static int ph3_coord_daysInMonth(int n) throws Exception { 578 return ph2_coord_daysInMonth(n); 579 } 580 581 /** 582 * Determine the date-time in Oozie processing timezone of n-th latest available dataset instance. <p/> It depends 583 * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST 584 * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of 585 * dataset's directory 586 * 587 * @param n :instance count <p/> domain: n > 0, n is integer 588 * @return date-time in Oozie processing timezone of the n-th instance <p/> returns 'null' means n-th instance is 589 * earlier than Initial-Instance of DS 590 * @throws Exception 591 */ 592 public static String ph3_coord_latest(int n) throws Exception { 593 if (n > 0) { 594 throw new IllegalArgumentException("paramter should be <= 0 but it is " + n); 595 } 596 if (isSyncDataSet()) {// For Sync Dataset 597 return coord_latest_sync(n); 598 } 599 else { 600 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 601 } 602 } 603 604 /** 605 * Determine the date-time in Oozie processing timezone of latest available dataset instances 606 * from start to end offsets from the nominal time. <p/> It depends 607 * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST 608 * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of 609 * dataset's directory 610 * 611 * @param start :start instance offset <p/> domain: start > 0, start is integer 612 * @param end :end instance offset <p/> domain: end > 0, end is integer 613 * @return date-time in Oozie processing timezone of the instances from start to end offsets 614 * delimited by comma. <p/> returns 'null' means start offset instance is 615 * earlier than Initial-Instance of DS 616 * @throws Exception 617 */ 618 public static String ph3_coord_latestRange(int start, int end) throws Exception { 619 if (isSyncDataSet()) {// For Sync Dataset 620 return coord_latestRange_sync(start, end); 621 } 622 else { 623 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 624 } 625 } 626 627 /** 628 * Configure an evaluator with data set and application specific information. <p/> Helper method of associating 629 * dataset and application object 630 * 631 * @param evaluator : to set variables 632 * @param ds : Data Set object 633 * @param coordAction : Application instance 634 */ 635 public static void configureEvaluator(ELEvaluator evaluator, SyncCoordDataset ds, SyncCoordAction coordAction) { 636 evaluator.setVariable(COORD_ACTION, coordAction); 637 evaluator.setVariable(DATASET, ds); 638 } 639 640 /** 641 * Helper method to wrap around with "${..}". <p/> 642 * 643 * 644 * @param eval :EL evaluator 645 * @param expr : expression to evaluate 646 * @return Resolved expression or echo back the same expression 647 * @throws Exception 648 */ 649 public static String evalAndWrap(ELEvaluator eval, String expr) throws Exception { 650 try { 651 eval.setVariable(".wrap", null); 652 String result = eval.evaluate(expr, String.class); 653 if (eval.getVariable(".wrap") != null) { 654 return "${" + result + "}"; 655 } 656 else { 657 return result; 658 } 659 } 660 catch (Exception e) { 661 throw new Exception("Unable to evaluate :" + expr + ":\n", e); 662 } 663 } 664 665 // Set of echo functions 666 667 public static String ph1_coord_current_echo(String n) { 668 return echoUnResolved("current", n); 669 } 670 671 public static String ph1_coord_offset_echo(String n, String timeUnit) { 672 return echoUnResolved("offset", n + " , " + timeUnit); 673 } 674 675 public static String ph2_coord_current_echo(String n) { 676 return echoUnResolved("current", n); 677 } 678 679 public static String ph2_coord_offset_echo(String n, String timeUnit) { 680 return echoUnResolved("offset", n + " , " + timeUnit); 681 } 682 683 public static String ph1_coord_dateOffset_echo(String n, String offset, String unit) { 684 return echoUnResolved("dateOffset", n + " , " + offset + " , " + unit); 685 } 686 687 public static String ph1_coord_formatTime_echo(String dateTime, String format) { 688 // Quote the dateTime value since it would contain a ':'. 689 return echoUnResolved("formatTime", "'"+dateTime+"'" + " , " + format); 690 } 691 692 public static String ph1_coord_latest_echo(String n) { 693 return echoUnResolved("latest", n); 694 } 695 696 public static String ph2_coord_latest_echo(String n) { 697 return ph1_coord_latest_echo(n); 698 } 699 700 public static String ph1_coord_future_echo(String n, String instance) { 701 return echoUnResolved("future", n + ", " + instance + ""); 702 } 703 704 public static String ph2_coord_future_echo(String n, String instance) { 705 return ph1_coord_future_echo(n, instance); 706 } 707 708 public static String ph1_coord_latestRange_echo(String start, String end) { 709 return echoUnResolved("latestRange", start + ", " + end); 710 } 711 712 public static String ph2_coord_latestRange_echo(String start, String end) { 713 return ph1_coord_latestRange_echo(start, end); 714 } 715 716 public static String ph1_coord_futureRange_echo(String start, String end, String instance) { 717 return echoUnResolved("futureRange", start + ", " + end + ", " + instance); 718 } 719 720 public static String ph2_coord_futureRange_echo(String start, String end, String instance) { 721 return ph1_coord_futureRange_echo(start, end, instance); 722 } 723 724 public static String ph1_coord_dataIn_echo(String n) { 725 ELEvaluator eval = ELEvaluator.getCurrent(); 726 String val = (String) eval.getVariable("oozie.dataname." + n); 727 if (val == null || val.equals("data-in") == false) { 728 LOG.error("data_in_name " + n + " is not valid"); 729 throw new RuntimeException("data_in_name " + n + " is not valid"); 730 } 731 return echoUnResolved("dataIn", "'" + n + "'"); 732 } 733 734 public static String ph1_coord_dataOut_echo(String n) { 735 ELEvaluator eval = ELEvaluator.getCurrent(); 736 String val = (String) eval.getVariable("oozie.dataname." + n); 737 if (val == null || val.equals("data-out") == false) { 738 LOG.error("data_out_name " + n + " is not valid"); 739 throw new RuntimeException("data_out_name " + n + " is not valid"); 740 } 741 return echoUnResolved("dataOut", "'" + n + "'"); 742 } 743 744 public static String ph1_coord_nominalTime_echo() { 745 return echoUnResolved("nominalTime", ""); 746 } 747 748 public static String ph1_coord_nominalTime_echo_wrap() { 749 // return "${coord:nominalTime()}"; // no resolution 750 return echoUnResolved("nominalTime", ""); 751 } 752 753 public static String ph1_coord_nominalTime_echo_fixed() { 754 return "2009-03-06T010:00"; // Dummy resolution 755 } 756 757 public static String ph1_coord_actualTime_echo_wrap() { 758 // return "${coord:actualTime()}"; // no resolution 759 return echoUnResolved("actualTime", ""); 760 } 761 762 public static String ph1_coord_actionId_echo() { 763 return echoUnResolved("actionId", ""); 764 } 765 766 public static String ph1_coord_name_echo() { 767 return echoUnResolved("name", ""); 768 } 769 770 // The following echo functions are not used in any phases yet 771 // They are here for future purpose. 772 public static String coord_minutes_echo(String n) { 773 return echoUnResolved("minutes", n); 774 } 775 776 public static String coord_hours_echo(String n) { 777 return echoUnResolved("hours", n); 778 } 779 780 public static String coord_days_echo(String n) { 781 return echoUnResolved("days", n); 782 } 783 784 public static String coord_endOfDay_echo(String n) { 785 return echoUnResolved("endOfDay", n); 786 } 787 788 public static String coord_months_echo(String n) { 789 return echoUnResolved("months", n); 790 } 791 792 public static String coord_endOfMonth_echo(String n) { 793 return echoUnResolved("endOfMonth", n); 794 } 795 796 public static String coord_actualTime_echo() { 797 return echoUnResolved("actualTime", ""); 798 } 799 800 // This echo function will always return "24" for validation only. 801 // This evaluation ****should not**** replace the original XML 802 // Create a temporary string and validate the function 803 // This is **required** for evaluating an expression like 804 // coord:HoursInDay(0) + 3 805 // actual evaluation will happen in phase 2 or phase 3. 806 public static String ph1_coord_hoursInDay_echo(String n) { 807 return "24"; 808 // return echoUnResolved("hoursInDay", n); 809 } 810 811 // This echo function will always return "30" for validation only. 812 // This evaluation ****should not**** replace the original XML 813 // Create a temporary string and validate the function 814 // This is **required** for evaluating an expression like 815 // coord:daysInMonth(0) + 3 816 // actual evaluation will happen in phase 2 or phase 3. 817 public static String ph1_coord_daysInMonth_echo(String n) { 818 // return echoUnResolved("daysInMonth", n); 819 return "30"; 820 } 821 822 // This echo function will always return "3" for validation only. 823 // This evaluation ****should not**** replace the original XML 824 // Create a temporary string and validate the function 825 // This is **required** for evaluating an expression like coord:tzOffset + 2 826 // actual evaluation will happen in phase 2 or phase 3. 827 public static String ph1_coord_tzOffset_echo() { 828 // return echoUnResolved("tzOffset", ""); 829 return "3"; 830 } 831 832 // Local methods 833 /** 834 * @param n 835 * @return n-th instance Date-Time from current instance for data-set <p/> return empty string ("") if the 836 * Action_Creation_time or the n-th instance <p/> is earlier than the Initial_Instance of dataset. 837 * @throws Exception 838 */ 839 private static String coord_current_sync(int n) throws Exception { 840 int datasetFrequency = getDSFrequency();// in minutes 841 TimeUnit dsTimeUnit = getDSTimeUnit(); 842 int[] instCount = new int[1];// used as pass by ref 843 Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount); 844 if (nominalInstanceCal == null) { 845 LOG.warn("If the initial instance of the dataset is later than the nominal time, an empty string is" 846 + " returned. This means that no data is available at the current-instance specified by the user" 847 + " and the user could try modifying his initial-instance to an earlier time."); 848 return ""; 849 } 850 nominalInstanceCal = getInitialInstanceCal(); 851 int absInstanceCount = instCount[0] + n; 852 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency * absInstanceCount); 853 854 if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) { 855 LOG.warn("If the initial instance of the dataset is later than the current-instance specified, such as" 856 + " coord:current({0}) in this case, an empty string is returned. This means that no data is" 857 + " available at the current-instance specified by the user and the user could try modifying his" 858 + " initial-instance to an earlier time.", n); 859 return ""; 860 } 861 String str = DateUtils.formatDateOozieTZ(nominalInstanceCal); 862 return str; 863 } 864 865 /** 866 * 867 * @param n offset amount (integer) 868 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR") 869 * @return the offset time from the effective nominal time <p/> return empty string ("") if the Action_Creation_time or the 870 * offset instance <p/> is earlier than the Initial_Instance of dataset. 871 * @throws Exception 872 */ 873 private static String coord_offset_sync(int n, String timeUnit) throws Exception { 874 Calendar rawCal = resolveOffsetRawTime(n, TimeUnit.valueOf(timeUnit), null); 875 if (rawCal == null) { 876 // warning already logged by resolveOffsetRawTime() 877 return ""; 878 } 879 880 int freq = getDSFrequency(); 881 TimeUnit freqUnit = getDSTimeUnit(); 882 int freqCount = 0; 883 // We're going to manually turn back/forward cal by decrements/increments of freq and then check that it gives the same 884 // time as rawCal; this is to check that the offset time resolves to a frequency offset of the effective nominal time 885 // In other words, that there exists an integer x, such that coord:offset(n, timeUnit) == coord:current(x) is true 886 // If not, then we'll "rewind" rawCal to the latest instance earlier than rawCal and use that. 887 Calendar cal = getInitialInstanceCal(); 888 if (rawCal.before(cal)) { 889 while (cal.after(rawCal)) { 890 cal.add(freqUnit.getCalendarUnit(), -freq); 891 freqCount--; 892 } 893 } 894 else if (rawCal.after(cal)) { 895 while (cal.before(rawCal)) { 896 cal.add(freqUnit.getCalendarUnit(), freq); 897 freqCount++; 898 } 899 } 900 if (cal.before(rawCal)) { 901 rawCal = cal; 902 } 903 else if (cal.after(rawCal)) { 904 cal.add(freqUnit.getCalendarUnit(), -freq); 905 rawCal = cal; 906 freqCount--; 907 } 908 String rawCalStr = DateUtils.formatDateOozieTZ(rawCal); 909 910 Calendar nominalInstanceCal = getInitialInstanceCal(); 911 nominalInstanceCal.add(freqUnit.getCalendarUnit(), freq * freqCount); 912 if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) { 913 LOG.warn("If the initial instance of the dataset is later than the offset instance" 914 + " specified, such as coord:offset({0}, {1}) in this case, an empty string is returned. This means that no" 915 + " data is available at the offset instance specified by the user and the user could try modifying his" 916 + " initial-instance to an earlier time.", n, timeUnit); 917 return ""; 918 } 919 String nominalCalStr = DateUtils.formatDateOozieTZ(nominalInstanceCal); 920 921 if (!rawCalStr.equals(nominalCalStr)) { 922 throw new RuntimeException("Shouldn't happen"); 923 } 924 return rawCalStr; 925 } 926 927 /** 928 * @param offset 929 * @return n-th available latest instance Date-Time for SYNC data-set 930 * @throws Exception 931 */ 932 private static String coord_latest_sync(int offset) throws Exception { 933 return coord_latestRange_sync(offset, offset); 934 } 935 936 private static String coord_latestRange_sync(int startOffset, int endOffset) throws Exception { 937 if (startOffset > 0) { 938 throw new RuntimeException("For latest there is no meaning " + "of positive instance. n should be <=0" 939 + startOffset); 940 } 941 if (endOffset > 0) { 942 throw new RuntimeException("For latest there is no meaning " + "of positive instance. n should be <=0" 943 + endOffset); 944 } 945 ELEvaluator eval = ELEvaluator.getCurrent(); 946 String retVal = ""; 947 int datasetFrequency = (int) getDSFrequency();// in minutes 948 TimeUnit dsTimeUnit = getDSTimeUnit(); 949 int[] instCount = new int[1]; 950 boolean useCurrentTime = Services.get().getConf().getBoolean(LATEST_EL_USE_CURRENT_TIME, false); 951 Calendar nominalInstanceCal; 952 if (useCurrentTime) { 953 nominalInstanceCal = getCurrentInstance(new Date(), instCount); 954 } 955 else { 956 nominalInstanceCal = getCurrentInstance(getActualTime(), instCount); 957 } 958 StringBuilder resolvedInstances = new StringBuilder(); 959 StringBuilder resolvedURIPaths = new StringBuilder(); 960 if (nominalInstanceCal != null) { 961 Calendar initInstance = getInitialInstanceCal(); 962 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 963 if (ds == null) { 964 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 965 } 966 String uriTemplate = ds.getUriTemplate(); 967 Configuration conf = (Configuration) eval.getVariable(CONFIGURATION); 968 if (conf == null) { 969 throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION); 970 } 971 int available = 0; 972 boolean resolved = false; 973 String user = ParamChecker 974 .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME); 975 String doneFlag = ds.getDoneFlag(); 976 while (nominalInstanceCal.compareTo(initInstance) >= 0) { 977 ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal); 978 String uriPath = uriEval.evaluate(uriTemplate, String.class); 979 String pathWithDoneFlag = uriPath; 980 if (doneFlag.length() > 0) { 981 pathWithDoneFlag += "/" + doneFlag; 982 } 983 if (isPathAvailable(pathWithDoneFlag, user, null, conf)) { 984 LOG.debug("Found latest(" + available + "): " + pathWithDoneFlag); 985 if (available == startOffset) { 986 LOG.debug("Matched latest(" + available + "): " + pathWithDoneFlag); 987 resolved = true; 988 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)); 989 resolvedURIPaths.append(uriPath); 990 retVal = resolvedInstances.toString(); 991 eval.setVariable("resolved_path", resolvedURIPaths.toString()); 992 break; 993 } else if (available <= endOffset) { 994 LOG.debug("Matched latest(" + available + "): " + pathWithDoneFlag); 995 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(INSTANCE_SEPARATOR); 996 resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR); 997 } 998 999 available--; 1000 } 1001 // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), 1002 // -datasetFrequency); 1003 nominalInstanceCal = (Calendar) initInstance.clone(); 1004 instCount[0]--; 1005 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 1006 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 1007 } 1008 if (!resolved) { 1009 // return unchanged latest function with variable 'is_resolved' 1010 // to 'false' 1011 eval.setVariable("is_resolved", Boolean.FALSE); 1012 if (startOffset == endOffset) { 1013 retVal = "${coord:latest(" + startOffset + ")}"; 1014 } 1015 else { 1016 retVal = "${coord:latestRange(" + startOffset + "," + endOffset + ")}"; 1017 } 1018 } 1019 else { 1020 eval.setVariable("is_resolved", Boolean.TRUE); 1021 } 1022 } 1023 else {// No feasible nominal time 1024 eval.setVariable("is_resolved", Boolean.FALSE); 1025 } 1026 return retVal; 1027 } 1028 1029 // TODO : Not an efficient way. In a loop environment, we could do something 1030 // outside the loop 1031 /** 1032 * Check whether a URI path exists 1033 * 1034 * @param sPath 1035 * @param conf 1036 * @return 1037 * @throws IOException 1038 */ 1039 1040 private static boolean isPathAvailable(String sPath, String user, String group, Configuration conf) 1041 throws IOException, HadoopAccessorException { 1042 // sPath += "/" + END_OF_OPERATION_INDICATOR_FILE; 1043 Path path = new Path(sPath); 1044 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 1045 Configuration fsConf = has.createJobConf(path.toUri().getAuthority()); 1046 return has.createFileSystem(user, path.toUri(), fsConf).exists(path); 1047 } 1048 1049 /** 1050 * @param tm 1051 * @return a new Evaluator to be used for URI-template evaluation 1052 */ 1053 private static ELEvaluator getUriEvaluator(Calendar tm) { 1054 tm.setTimeZone(DateUtils.getOozieProcessingTimeZone()); 1055 ELEvaluator retEval = new ELEvaluator(); 1056 retEval.setVariable("YEAR", tm.get(Calendar.YEAR)); 1057 retEval.setVariable("MONTH", (tm.get(Calendar.MONTH) + 1) < 10 ? "0" + (tm.get(Calendar.MONTH) + 1) : (tm 1058 .get(Calendar.MONTH) + 1)); 1059 retEval.setVariable("DAY", tm.get(Calendar.DAY_OF_MONTH) < 10 ? "0" + tm.get(Calendar.DAY_OF_MONTH) : tm 1060 .get(Calendar.DAY_OF_MONTH)); 1061 retEval.setVariable("HOUR", tm.get(Calendar.HOUR_OF_DAY) < 10 ? "0" + tm.get(Calendar.HOUR_OF_DAY) : tm 1062 .get(Calendar.HOUR_OF_DAY)); 1063 retEval.setVariable("MINUTE", tm.get(Calendar.MINUTE) < 10 ? "0" + tm.get(Calendar.MINUTE) : tm 1064 .get(Calendar.MINUTE)); 1065 return retEval; 1066 } 1067 1068 /** 1069 * @return whether a data set is SYNCH or ASYNC 1070 */ 1071 private static boolean isSyncDataSet() { 1072 ELEvaluator eval = ELEvaluator.getCurrent(); 1073 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1074 if (ds == null) { 1075 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1076 } 1077 return ds.getType().equalsIgnoreCase("SYNC"); 1078 } 1079 1080 /** 1081 * Check whether a function should be resolved. 1082 * 1083 * @param functionName 1084 * @param n 1085 * @return null if the functionName needs to be resolved otherwise return the calling function unresolved. 1086 */ 1087 private static String checkIfResolved(String functionName, String n) { 1088 ELEvaluator eval = ELEvaluator.getCurrent(); 1089 String replace = (String) eval.getVariable("resolve_" + functionName); 1090 if (replace == null || (replace != null && replace.equalsIgnoreCase("false"))) { // Don't 1091 // resolve 1092 // return "${coord:" + functionName + "(" + n +")}"; //Unresolved 1093 eval.setVariable(".wrap", "true"); 1094 return "coord:" + functionName + "(" + n + ")"; // Unresolved 1095 } 1096 return null; // Resolved it 1097 } 1098 1099 private static String echoUnResolved(String functionName, String n) { 1100 return echoUnResolvedPre(functionName, n, "coord:"); 1101 } 1102 1103 private static String echoUnResolvedPre(String functionName, String n, String prefix) { 1104 ELEvaluator eval = ELEvaluator.getCurrent(); 1105 eval.setVariable(".wrap", "true"); 1106 return prefix + functionName + "(" + n + ")"; // Unresolved 1107 } 1108 1109 /** 1110 * @return the initial instance of a DataSet in DATE 1111 */ 1112 private static Date getInitialInstance() { 1113 ELEvaluator eval = ELEvaluator.getCurrent(); 1114 return getInitialInstance(eval); 1115 } 1116 1117 /** 1118 * @return the initial instance of a DataSet in DATE 1119 */ 1120 private static Date getInitialInstance(ELEvaluator eval) { 1121 return getInitialInstanceCal(eval).getTime(); 1122 // return ds.getInitInstance(); 1123 } 1124 1125 /** 1126 * @return the initial instance of a DataSet in Calendar 1127 */ 1128 private static Calendar getInitialInstanceCal() { 1129 ELEvaluator eval = ELEvaluator.getCurrent(); 1130 return getInitialInstanceCal(eval); 1131 } 1132 1133 /** 1134 * @return the initial instance of a DataSet in Calendar 1135 */ 1136 private static Calendar getInitialInstanceCal(ELEvaluator eval) { 1137 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1138 if (ds == null) { 1139 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1140 } 1141 Calendar effInitTS = Calendar.getInstance(); 1142 effInitTS.setTime(ds.getInitInstance()); 1143 effInitTS.setTimeZone(ds.getTimeZone()); 1144 // To adjust EOD/EOM 1145 DateUtils.moveToEnd(effInitTS, getDSEndOfFlag(eval)); 1146 return effInitTS; 1147 // return ds.getInitInstance(); 1148 } 1149 1150 /** 1151 * @return Nominal or action creation Time when all the dependencies of an application instance are met. 1152 */ 1153 private static Date getActionCreationtime() { 1154 ELEvaluator eval = ELEvaluator.getCurrent(); 1155 return getActionCreationtime(eval); 1156 } 1157 1158 /** 1159 * @return Nominal or action creation Time when all the dependencies of an application instance are met. 1160 */ 1161 private static Date getActionCreationtime(ELEvaluator eval) { 1162 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 1163 if (coordAction == null) { 1164 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 1165 } 1166 return coordAction.getNominalTime(); 1167 } 1168 1169 /** 1170 * @return Actual Time when all the dependencies of an application instance are met. 1171 */ 1172 private static Date getActualTime() { 1173 ELEvaluator eval = ELEvaluator.getCurrent(); 1174 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 1175 if (coordAction == null) { 1176 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 1177 } 1178 return coordAction.getActualTime(); 1179 } 1180 1181 /** 1182 * @return TimeZone for the application or job. 1183 */ 1184 private static TimeZone getJobTZ() { 1185 ELEvaluator eval = ELEvaluator.getCurrent(); 1186 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 1187 if (coordAction == null) { 1188 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 1189 } 1190 return coordAction.getTimeZone(); 1191 } 1192 1193 /** 1194 * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time) 1195 * 1196 * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of 1197 * the dataset. 1198 */ 1199 private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[]) { 1200 ELEvaluator eval = ELEvaluator.getCurrent(); 1201 return getCurrentInstance(effectiveTime, instanceCount, eval); 1202 } 1203 1204 /** 1205 * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time) 1206 * 1207 * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of 1208 * the dataset. 1209 */ 1210 private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[], ELEvaluator eval) { 1211 Date datasetInitialInstance = getInitialInstance(eval); 1212 TimeUnit dsTimeUnit = getDSTimeUnit(eval); 1213 TimeZone dsTZ = getDatasetTZ(eval); 1214 int dsFreq = getDSFrequency(eval); 1215 // Convert Date to Calendar for corresponding TZ 1216 Calendar current = Calendar.getInstance(); 1217 current.setTime(datasetInitialInstance); 1218 current.setTimeZone(dsTZ); 1219 1220 Calendar calEffectiveTime = Calendar.getInstance(); 1221 calEffectiveTime.setTime(effectiveTime); 1222 calEffectiveTime.setTimeZone(dsTZ); 1223 if (instanceCount == null) { // caller doesn't care about this value 1224 instanceCount = new int[1]; 1225 } 1226 instanceCount[0] = 0; 1227 if (current.compareTo(calEffectiveTime) > 0) { 1228 return null; 1229 } 1230 Calendar origCurrent = (Calendar) current.clone(); 1231 while (current.compareTo(calEffectiveTime) <= 0) { 1232 current = (Calendar) origCurrent.clone(); 1233 instanceCount[0]++; 1234 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq); 1235 } 1236 instanceCount[0]--; 1237 1238 current = (Calendar) origCurrent.clone(); 1239 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq); 1240 return current; 1241 } 1242 1243 private static Calendar getEffectiveNominalTime() { 1244 Date datasetInitialInstance = getInitialInstance(); 1245 TimeZone dsTZ = getDatasetTZ(); 1246 // Convert Date to Calendar for corresponding TZ 1247 Calendar current = Calendar.getInstance(); 1248 current.setTime(datasetInitialInstance); 1249 current.setTimeZone(dsTZ); 1250 1251 Calendar calEffectiveTime = Calendar.getInstance(); 1252 calEffectiveTime.setTime(getActionCreationtime()); 1253 calEffectiveTime.setTimeZone(dsTZ); 1254 if (current.compareTo(calEffectiveTime) > 0) { 1255 // Nominal Time < initial Instance 1256 // TODO: getClass() call doesn't work from static method. 1257 // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+ 1258 // current.getTime()); 1259 return null; 1260 } 1261 return calEffectiveTime; 1262 } 1263 1264 /** 1265 * @return dataset frequency in minutes 1266 */ 1267 private static int getDSFrequency() { 1268 ELEvaluator eval = ELEvaluator.getCurrent(); 1269 return getDSFrequency(eval); 1270 } 1271 1272 /** 1273 * @return dataset frequency in minutes 1274 */ 1275 private static int getDSFrequency(ELEvaluator eval) { 1276 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1277 if (ds == null) { 1278 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1279 } 1280 return ds.getFrequency(); 1281 } 1282 1283 /** 1284 * @return dataset TimeUnit 1285 */ 1286 private static TimeUnit getDSTimeUnit() { 1287 ELEvaluator eval = ELEvaluator.getCurrent(); 1288 return getDSTimeUnit(eval); 1289 } 1290 1291 /** 1292 * @return dataset TimeUnit 1293 */ 1294 private static TimeUnit getDSTimeUnit(ELEvaluator eval) { 1295 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1296 if (ds == null) { 1297 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1298 } 1299 return ds.getTimeUnit(); 1300 } 1301 1302 /** 1303 * @return dataset TimeZone 1304 */ 1305 private static TimeZone getDatasetTZ() { 1306 ELEvaluator eval = ELEvaluator.getCurrent(); 1307 return getDatasetTZ(eval); 1308 } 1309 1310 /** 1311 * @return dataset TimeZone 1312 */ 1313 private static TimeZone getDatasetTZ(ELEvaluator eval) { 1314 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1315 if (ds == null) { 1316 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1317 } 1318 return ds.getTimeZone(); 1319 } 1320 1321 /** 1322 * @return dataset TimeUnit 1323 */ 1324 private static TimeUnit getDSEndOfFlag() { 1325 ELEvaluator eval = ELEvaluator.getCurrent(); 1326 return getDSEndOfFlag(eval); 1327 } 1328 1329 /** 1330 * @return dataset TimeUnit 1331 */ 1332 private static TimeUnit getDSEndOfFlag(ELEvaluator eval) { 1333 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1334 if (ds == null) { 1335 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1336 } 1337 return ds.getEndOfDuration();// == null ? "": ds.getEndOfDuration(); 1338 } 1339 1340 /** 1341 * Return a job configuration property for the coordinator. 1342 * 1343 * @param property property name. 1344 * @return the value of the property, <code>null</code> if the property is undefined. 1345 */ 1346 public static String coord_conf(String property) { 1347 ELEvaluator eval = ELEvaluator.getCurrent(); 1348 return (String) eval.getVariable(property); 1349 } 1350 1351 /** 1352 * Return the user that submitted the coordinator job. 1353 * 1354 * @return the user that submitted the coordinator job. 1355 */ 1356 public static String coord_user() { 1357 ELEvaluator eval = ELEvaluator.getCurrent(); 1358 return (String) eval.getVariable(OozieClient.USER_NAME); 1359 } 1360 1361 /** 1362 * Takes two offset times and returns a list of multiples of the frequency offset from the effective nominal time that occur 1363 * between them. The caller should make sure that startCal is earlier than endCal. 1364 * <p> 1365 * As a simple example, assume its the same day: startCal is 1:00, endCal is 2:00, frequency is 20min, and effective nominal 1366 * time is 1:20 -- then this method would return a list containing: -20, 0, 20, 40, 60 1367 * 1368 * @param startCal The earlier offset time 1369 * @param endCal The later offset time 1370 * @param eval The ELEvaluator to use; cannot be null 1371 * @return A list of multiple of the frequency offset from the effective nominal time that occur between the startCal and endCal 1372 */ 1373 public static List<Integer> expandOffsetTimes(Calendar startCal, Calendar endCal, ELEvaluator eval) { 1374 List<Integer> expandedFreqs = new ArrayList<Integer>(); 1375 // Use eval because the "current" eval isn't set 1376 int freq = getDSFrequency(eval); 1377 TimeUnit freqUnit = getDSTimeUnit(eval); 1378 Calendar cal = getCurrentInstance(getActionCreationtime(eval), null, eval); 1379 int totalFreq = 0; 1380 if (startCal.before(cal)) { 1381 while (cal.after(startCal)) { 1382 cal.add(freqUnit.getCalendarUnit(), -freq); 1383 totalFreq += -freq; 1384 } 1385 if (cal.before(startCal)) { 1386 cal.add(freqUnit.getCalendarUnit(), freq); 1387 totalFreq += freq; 1388 } 1389 } 1390 else if (startCal.after(cal)) { 1391 while (cal.before(startCal)) { 1392 cal.add(freqUnit.getCalendarUnit(), freq); 1393 totalFreq += freq; 1394 } 1395 } 1396 // At this point, cal is the smallest multiple of the dataset frequency that is >= to the startCal and offset from the 1397 // effective nominal time. Now we can find all of the instances that occur between startCal and endCal, inclusive. 1398 while (cal.before(endCal) || cal.equals(endCal)) { 1399 expandedFreqs.add(totalFreq); 1400 cal.add(freqUnit.getCalendarUnit(), freq); 1401 totalFreq += freq; 1402 } 1403 return expandedFreqs; 1404 } 1405 1406 /** 1407 * Resolve the offset time from the effective nominal time 1408 * 1409 * @param n offset amount (integer) 1410 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR") 1411 * @param eval The ELEvaluator to use; or null to use the "current" eval 1412 * @return A Calendar of the offset time 1413 */ 1414 public static Calendar resolveOffsetRawTime(int n, TimeUnit timeUnit, ELEvaluator eval) { 1415 // Use eval if given (for when the "current" eval isn't set) 1416 Calendar cal; 1417 if (eval == null) { 1418 cal = getCurrentInstance(getActionCreationtime(), null); 1419 } 1420 else { 1421 cal = getCurrentInstance(getActionCreationtime(eval), null, eval); 1422 } 1423 if (cal == null) { 1424 LOG.warn("If the initial instance of the dataset is later than the nominal time, an" 1425 + " empty string is returned. This means that no data is available at the offset instance specified by the user" 1426 + " and the user could try modifying his or her initial-instance to an earlier time."); 1427 return null; 1428 } 1429 cal.add(timeUnit.getCalendarUnit(), n); 1430 return cal; 1431 } 1432 }