001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.coord; 019 020 import java.net.URI; 021 import java.util.ArrayList; 022 import java.util.Calendar; 023 import java.util.Date; 024 import java.util.List; 025 import java.util.TimeZone; 026 027 import org.apache.hadoop.conf.Configuration; 028 import org.apache.oozie.client.OozieClient; 029 import org.apache.oozie.dependency.URIHandler.Context; 030 import org.apache.oozie.dependency.URIHandler; 031 import org.apache.oozie.util.DateUtils; 032 import org.apache.oozie.util.ELEvaluator; 033 import org.apache.oozie.util.ParamChecker; 034 import org.apache.oozie.util.XLog; 035 import org.apache.oozie.service.Services; 036 import org.apache.oozie.service.URIHandlerService; 037 038 /** 039 * This class implements the EL function related to coordinator 040 */ 041 042 public class CoordELFunctions { 043 final public static String DATASET = "oozie.coord.el.dataset.bean"; 044 final public static String COORD_ACTION = "oozie.coord.el.app.bean"; 045 final public static String CONFIGURATION = "oozie.coord.el.conf"; 046 final public static String LATEST_EL_USE_CURRENT_TIME = "oozie.service.ELService.latest-el.use-current-time"; 047 // INSTANCE_SEPARATOR is used to separate multiple directories into one tag. 048 final public static String INSTANCE_SEPARATOR = "#"; 049 final public static String DIR_SEPARATOR = ","; 050 // TODO: in next release, support flexibility 051 private static String END_OF_OPERATION_INDICATOR_FILE = "_SUCCESS"; 052 053 /** 054 * Used in defining the frequency in 'day' unit. <p/> domain: <code> val > 0</code> and should be integer. 055 * 056 * @param val frequency in number of days. 057 * @return number of days and also set the frequency timeunit to "day" 058 */ 059 public static int ph1_coord_days(int val) { 060 val = ParamChecker.checkGTZero(val, "n"); 061 ELEvaluator eval = ELEvaluator.getCurrent(); 062 eval.setVariable("timeunit", TimeUnit.DAY); 063 eval.setVariable("endOfDuration", TimeUnit.NONE); 064 return val; 065 } 066 067 /** 068 * Used in defining the frequency in 'month' unit. <p/> domain: <code> val > 0</code> and should be integer. 069 * 070 * @param val frequency in number of months. 071 * @return number of months and also set the frequency timeunit to "month" 072 */ 073 public static int ph1_coord_months(int val) { 074 val = ParamChecker.checkGTZero(val, "n"); 075 ELEvaluator eval = ELEvaluator.getCurrent(); 076 eval.setVariable("timeunit", TimeUnit.MONTH); 077 eval.setVariable("endOfDuration", TimeUnit.NONE); 078 return val; 079 } 080 081 /** 082 * Used in defining the frequency in 'hour' unit. <p/> parameter value domain: <code> val > 0</code> and should 083 * be integer. 084 * 085 * @param val frequency in number of hours. 086 * @return number of minutes and also set the frequency timeunit to "minute" 087 */ 088 public static int ph1_coord_hours(int val) { 089 val = ParamChecker.checkGTZero(val, "n"); 090 ELEvaluator eval = ELEvaluator.getCurrent(); 091 eval.setVariable("timeunit", TimeUnit.MINUTE); 092 eval.setVariable("endOfDuration", TimeUnit.NONE); 093 return val * 60; 094 } 095 096 /** 097 * Used in defining the frequency in 'minute' unit. <p/> domain: <code> val > 0</code> and should be integer. 098 * 099 * @param val frequency in number of minutes. 100 * @return number of minutes and also set the frequency timeunit to "minute" 101 */ 102 public static int ph1_coord_minutes(int val) { 103 val = ParamChecker.checkGTZero(val, "n"); 104 ELEvaluator eval = ELEvaluator.getCurrent(); 105 eval.setVariable("timeunit", TimeUnit.MINUTE); 106 eval.setVariable("endOfDuration", TimeUnit.NONE); 107 return val; 108 } 109 110 /** 111 * Used in defining the frequency in 'day' unit and specify the "end of day" property. <p/> Every instance will 112 * start at 00:00 hour of each day. <p/> domain: <code> val > 0</code> and should be integer. 113 * 114 * @param val frequency in number of days. 115 * @return number of days and also set the frequency timeunit to "day" and end_of_duration flag to "day" 116 */ 117 public static int ph1_coord_endOfDays(int val) { 118 val = ParamChecker.checkGTZero(val, "n"); 119 ELEvaluator eval = ELEvaluator.getCurrent(); 120 eval.setVariable("timeunit", TimeUnit.DAY); 121 eval.setVariable("endOfDuration", TimeUnit.END_OF_DAY); 122 return val; 123 } 124 125 /** 126 * Used in defining the frequency in 'month' unit and specify the "end of month" property. <p/> Every instance will 127 * start at first day of each month at 00:00 hour. <p/> domain: <code> val > 0</code> and should be integer. 128 * 129 * @param val: frequency in number of months. 130 * @return number of months and also set the frequency timeunit to "month" and end_of_duration flag to "month" 131 */ 132 public static int ph1_coord_endOfMonths(int val) { 133 val = ParamChecker.checkGTZero(val, "n"); 134 ELEvaluator eval = ELEvaluator.getCurrent(); 135 eval.setVariable("timeunit", TimeUnit.MONTH); 136 eval.setVariable("endOfDuration", TimeUnit.END_OF_MONTH); 137 return val; 138 } 139 140 /** 141 * Calculate the difference of timezone offset in minutes between dataset and coordinator job. <p/> Depends on: <p/> 142 * 1. Timezone of both dataset and job <p/> 2. Action creation Time 143 * 144 * @return difference in minutes (DataSet TZ Offset - Application TZ offset) 145 */ 146 public static int ph2_coord_tzOffset() { 147 Date actionCreationTime = getActionCreationtime(); 148 TimeZone dsTZ = ParamChecker.notNull(getDatasetTZ(), "DatasetTZ"); 149 TimeZone jobTZ = ParamChecker.notNull(getJobTZ(), "JobTZ"); 150 // Apply the TZ into Calendar object 151 Calendar dsTime = Calendar.getInstance(dsTZ); 152 dsTime.setTime(actionCreationTime); 153 Calendar jobTime = Calendar.getInstance(jobTZ); 154 jobTime.setTime(actionCreationTime); 155 return (dsTime.get(Calendar.ZONE_OFFSET) - jobTime.get(Calendar.ZONE_OFFSET)) / (1000 * 60); 156 } 157 158 public static int ph3_coord_tzOffset() { 159 return ph2_coord_tzOffset(); 160 } 161 162 /** 163 * Returns the a date string while given a base date in 'strBaseDate', 164 * offset and unit (e.g. DAY, MONTH, HOUR, MINUTE, MONTH). 165 * 166 * @param strBaseDate -- base date 167 * @param offset -- any number 168 * @param unit -- DAY, MONTH, HOUR, MINUTE, MONTH 169 * @return date string 170 * @throws Exception 171 */ 172 public static String ph2_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception { 173 Calendar baseCalDate = DateUtils.getCalendar(strBaseDate); 174 StringBuilder buffer = new StringBuilder(); 175 baseCalDate.add(TimeUnit.valueOf(unit).getCalendarUnit(), offset); 176 buffer.append(DateUtils.formatDateOozieTZ(baseCalDate)); 177 return buffer.toString(); 178 } 179 180 public static String ph3_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception { 181 return ph2_coord_dateOffset(strBaseDate, offset, unit); 182 } 183 184 /** 185 * Determine the date-time in Oozie processing timezone of n-th future available dataset instance 186 * from nominal Time but not beyond the instance specified as 'instance. 187 * <p/> 188 * It depends on: 189 * <p/> 190 * 1. Data set frequency 191 * <p/> 192 * 2. Data set Time unit (day, month, minute) 193 * <p/> 194 * 3. Data set Time zone/DST 195 * <p/> 196 * 4. End Day/Month flag 197 * <p/> 198 * 5. Data set initial instance 199 * <p/> 200 * 6. Action Creation Time 201 * <p/> 202 * 7. Existence of dataset's directory 203 * 204 * @param n :instance count 205 * <p/> 206 * domain: n >= 0, n is integer 207 * @param instance: How many future instance it should check? value should 208 * be >=0 209 * @return date-time in Oozie processing timezone of the n-th instance 210 * <p/> 211 * @throws Exception 212 */ 213 public static String ph3_coord_future(int n, int instance) throws Exception { 214 ParamChecker.checkGEZero(n, "future:n"); 215 ParamChecker.checkGTZero(instance, "future:instance"); 216 if (isSyncDataSet()) {// For Sync Dataset 217 return coord_future_sync(n, instance); 218 } 219 else { 220 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 221 } 222 } 223 224 /** 225 * Determine the date-time in Oozie processing timezone of the future available dataset instances 226 * from start to end offsets from nominal Time but not beyond the instance specified as 'instance'. 227 * <p/> 228 * It depends on: 229 * <p/> 230 * 1. Data set frequency 231 * <p/> 232 * 2. Data set Time unit (day, month, minute) 233 * <p/> 234 * 3. Data set Time zone/DST 235 * <p/> 236 * 4. End Day/Month flag 237 * <p/> 238 * 5. Data set initial instance 239 * <p/> 240 * 6. Action Creation Time 241 * <p/> 242 * 7. Existence of dataset's directory 243 * 244 * @param start : start instance offset 245 * <p/> 246 * domain: start >= 0, start is integer 247 * @param end : end instance offset 248 * <p/> 249 * domain: end >= 0, end is integer 250 * @param instance: How many future instance it should check? value should 251 * be >=0 252 * @return date-time in Oozie processing timezone of the instances from start to end offsets 253 * delimited by comma. 254 * <p/> 255 * @throws Exception 256 */ 257 public static String ph3_coord_futureRange(int start, int end, int instance) throws Exception { 258 ParamChecker.checkGEZero(start, "future:n"); 259 ParamChecker.checkGEZero(end, "future:n"); 260 ParamChecker.checkGTZero(instance, "future:instance"); 261 if (isSyncDataSet()) {// For Sync Dataset 262 return coord_futureRange_sync(start, end, instance); 263 } 264 else { 265 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 266 } 267 } 268 269 private static String coord_future_sync(int n, int instance) throws Exception { 270 return coord_futureRange_sync(n, n, instance); 271 } 272 273 private static String coord_futureRange_sync(int startOffset, int endOffset, int instance) throws Exception { 274 final XLog LOG = XLog.getLog(CoordELFunctions.class); 275 final Thread currentThread = Thread.currentThread(); 276 ELEvaluator eval = ELEvaluator.getCurrent(); 277 String retVal = ""; 278 int datasetFrequency = (int) getDSFrequency();// in minutes 279 TimeUnit dsTimeUnit = getDSTimeUnit(); 280 int[] instCount = new int[1]; 281 Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount); 282 StringBuilder resolvedInstances = new StringBuilder(); 283 StringBuilder resolvedURIPaths = new StringBuilder(); 284 if (nominalInstanceCal != null) { 285 Calendar initInstance = getInitialInstanceCal(); 286 nominalInstanceCal = (Calendar) initInstance.clone(); 287 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 288 289 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 290 if (ds == null) { 291 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 292 } 293 String uriTemplate = ds.getUriTemplate(); 294 Configuration conf = (Configuration) eval.getVariable(CONFIGURATION); 295 if (conf == null) { 296 throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION); 297 } 298 int available = 0, checkedInstance = 0; 299 boolean resolved = false; 300 String user = ParamChecker 301 .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME); 302 String doneFlag = ds.getDoneFlag(); 303 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 304 URIHandler uriHandler = null; 305 Context uriContext = null; 306 try { 307 while (instance >= checkedInstance && !currentThread.isInterrupted()) { 308 ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal); 309 String uriPath = uriEval.evaluate(uriTemplate, String.class); 310 if (uriHandler == null) { 311 URI uri = new URI(uriPath); 312 uriHandler = uriService.getURIHandler(uri); 313 uriContext = uriHandler.getContext(uri, conf, user); 314 } 315 String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag); 316 if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) { 317 if (available == endOffset) { 318 LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag); 319 resolved = true; 320 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)); 321 resolvedURIPaths.append(uriPath); 322 retVal = resolvedInstances.toString(); 323 eval.setVariable("resolved_path", resolvedURIPaths.toString()); 324 break; 325 } 326 else if (available >= startOffset) { 327 LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag); 328 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append( 329 INSTANCE_SEPARATOR); 330 resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR); 331 } 332 available++; 333 } 334 // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency); 335 nominalInstanceCal = (Calendar) initInstance.clone(); 336 instCount[0]++; 337 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 338 checkedInstance++; 339 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 340 } 341 } 342 finally { 343 if (uriContext != null) { 344 uriContext.destroy(); 345 } 346 } 347 if (!resolved) { 348 // return unchanged future function with variable 'is_resolved' 349 // to 'false' 350 eval.setVariable("is_resolved", Boolean.FALSE); 351 if (startOffset == endOffset) { 352 retVal = "${coord:future(" + startOffset + ", " + instance + ")}"; 353 } 354 else { 355 retVal = "${coord:futureRange(" + startOffset + ", " + endOffset + ", " + instance + ")}"; 356 } 357 } 358 else { 359 eval.setVariable("is_resolved", Boolean.TRUE); 360 } 361 } 362 else {// No feasible nominal time 363 eval.setVariable("is_resolved", Boolean.TRUE); 364 retVal = ""; 365 } 366 return retVal; 367 } 368 369 /** 370 * Return nominal time or Action Creation Time. 371 * <p/> 372 * 373 * @return coordinator action creation or materialization date time 374 * @throws Exception if unable to format the Date object to String 375 */ 376 public static String ph2_coord_nominalTime() throws Exception { 377 ELEvaluator eval = ELEvaluator.getCurrent(); 378 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 379 "Coordinator Action"); 380 return DateUtils.formatDateOozieTZ(action.getNominalTime()); 381 } 382 383 public static String ph3_coord_nominalTime() throws Exception { 384 return ph2_coord_nominalTime(); 385 } 386 387 /** 388 * Convert from standard date-time formatting to a desired format. 389 * <p/> 390 * @param dateTimeStr - A timestamp in standard (ISO8601) format. 391 * @param format - A string representing the desired format. 392 * @return coordinator action creation or materialization date time 393 * @throws Exception if unable to format the Date object to String 394 */ 395 public static String ph2_coord_formatTime(String dateTimeStr, String format) 396 throws Exception { 397 Date dateTime = DateUtils.parseDateOozieTZ(dateTimeStr); 398 return DateUtils.formatDateCustom(dateTime, format); 399 } 400 401 public static String ph3_coord_formatTime(String dateTimeStr, String format) 402 throws Exception { 403 return ph2_coord_formatTime(dateTimeStr, format); 404 } 405 406 /** 407 * Return Action Id. <p/> 408 * 409 * @return coordinator action Id 410 */ 411 public static String ph2_coord_actionId() throws Exception { 412 ELEvaluator eval = ELEvaluator.getCurrent(); 413 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 414 "Coordinator Action"); 415 return action.getActionId(); 416 } 417 418 public static String ph3_coord_actionId() throws Exception { 419 return ph2_coord_actionId(); 420 } 421 422 /** 423 * Return Job Name. <p/> 424 * 425 * @return coordinator name 426 */ 427 public static String ph2_coord_name() throws Exception { 428 ELEvaluator eval = ELEvaluator.getCurrent(); 429 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 430 "Coordinator Action"); 431 return action.getName(); 432 } 433 434 public static String ph3_coord_name() throws Exception { 435 return ph2_coord_name(); 436 } 437 438 /** 439 * Return Action Start time. <p/> 440 * 441 * @return coordinator action start time 442 * @throws Exception if unable to format the Date object to String 443 */ 444 public static String ph2_coord_actualTime() throws Exception { 445 ELEvaluator eval = ELEvaluator.getCurrent(); 446 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 447 if (coordAction == null) { 448 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 449 } 450 return DateUtils.formatDateOozieTZ(coordAction.getActualTime()); 451 } 452 453 public static String ph3_coord_actualTime() throws Exception { 454 return ph2_coord_actualTime(); 455 } 456 457 /** 458 * Used to specify a list of URI's that are used as input dir to the workflow job. <p/> Look for two evaluator-level 459 * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of 460 * URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something 461 * unresolved, this function will echo back the original function <p/> otherwise it sends the uris. 462 * 463 * @param dataInName : Datain name 464 * @return the list of URI's separated by INSTANCE_SEPARATOR <p/> if there are unresolved EL function (i.e. latest) 465 * , echo back <p/> the function without resolving the function. 466 */ 467 public static String ph3_coord_dataIn(String dataInName) { 468 String uris = ""; 469 ELEvaluator eval = ELEvaluator.getCurrent(); 470 uris = (String) eval.getVariable(".datain." + dataInName); 471 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); 472 if (unresolved != null && unresolved.booleanValue() == true) { 473 return "${coord:dataIn('" + dataInName + "')}"; 474 } 475 return uris; 476 } 477 478 /** 479 * Used to specify a list of URI's that are output dir of the workflow job. <p/> Look for one evaluator-level 480 * variable <p/> dataout.<DATAOUT_NAME> <p/> It defines the current list of URI. <p/> otherwise it sends the uris. 481 * 482 * @param dataOutName : Dataout name 483 * @return the list of URI's separated by INSTANCE_SEPARATOR 484 */ 485 public static String ph3_coord_dataOut(String dataOutName) { 486 String uris = ""; 487 ELEvaluator eval = ELEvaluator.getCurrent(); 488 uris = (String) eval.getVariable(".dataout." + dataOutName); 489 return uris; 490 } 491 492 /** 493 * Determine the date-time in Oozie processing timezone of n-th dataset instance. <p/> It depends on: <p/> 1. 494 * Data set frequency <p/> 2. 495 * Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. Data 496 * set initial instance <p/> 6. Action Creation Time 497 * 498 * @param n instance count domain: n is integer 499 * @return date-time in Oozie processing timezone of the n-th instance returns 'null' means n-th instance is 500 * earlier than Initial-Instance of DS 501 * @throws Exception 502 */ 503 public static String ph2_coord_current(int n) throws Exception { 504 if (isSyncDataSet()) { // For Sync Dataset 505 return coord_current_sync(n); 506 } 507 else { 508 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 509 } 510 } 511 512 /** 513 * Determine the date-time in Oozie processing timezone of current dataset instances 514 * from start to end offsets from the nominal time. <p/> It depends 515 * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST 516 * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time 517 * 518 * @param start :start instance offset <p/> domain: start <= 0, start is integer 519 * @param end :end instance offset <p/> domain: end <= 0, end is integer 520 * @return date-time in Oozie processing timezone of the instances from start to end offsets 521 * delimited by comma. <p/> If the current instance time of the dataset based on the Action Creation Time 522 * is earlier than the Initial-Instance of DS an empty string is returned. 523 * If an instance within the range is earlier than Initial-Instance of DS that instance is ignored 524 * @throws Exception 525 */ 526 public static String ph2_coord_currentRange(int start, int end) throws Exception { 527 if (isSyncDataSet()) { // For Sync Dataset 528 return coord_currentRange_sync(start, end); 529 } 530 else { 531 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 532 } 533 } 534 535 /** 536 * Determine the date-time in Oozie processing timezone of the given offset from the dataset effective nominal time. <p/> It 537 * depends on: <p> 1. Data set frequency <p/> 2. Data set Time Unit <p/> 3. Data set Time zone/DST 538 * <p/> 4. Data set initial instance <p/> 5. Action Creation Time 539 * 540 * @param n offset amount (integer) 541 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR") 542 * @return date-time in Oozie processing timezone of the given offset from the dataset effective nominal time 543 * @throws Exception if there was a problem formatting 544 */ 545 public static String ph2_coord_offset(int n, String timeUnit) throws Exception { 546 if (isSyncDataSet()) { // For Sync Dataset 547 return coord_offset_sync(n, timeUnit); 548 } 549 else { 550 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 551 } 552 } 553 554 /** 555 * Determine how many hours is on the date of n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency 556 * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. 557 * Data set initial instance <p/> 6. Action Creation Time 558 * 559 * @param n instance count <p/> domain: n is integer 560 * @return number of hours on that day <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS 561 * @throws Exception 562 */ 563 public static int ph2_coord_hoursInDay(int n) throws Exception { 564 int datasetFrequency = (int) getDSFrequency(); 565 // /Calendar nominalInstanceCal = 566 // getCurrentInstance(getActionCreationtime()); 567 Calendar nominalInstanceCal = getEffectiveNominalTime(); 568 if (nominalInstanceCal == null) { 569 return -1; 570 } 571 nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n); 572 /* 573 * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) 574 * { return -1; } 575 */ 576 nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ 577 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 578 return DateUtils.hoursInDay(nominalInstanceCal); 579 } 580 581 public static int ph3_coord_hoursInDay(int n) throws Exception { 582 return ph2_coord_hoursInDay(n); 583 } 584 585 /** 586 * Calculate number of days in one month for n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency . 587 * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. 588 * Data set initial instance <p/> 6. Action Creation Time 589 * 590 * @param n instance count. domain: n is integer 591 * @return number of days in that month <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS 592 * @throws Exception 593 */ 594 public static int ph2_coord_daysInMonth(int n) throws Exception { 595 int datasetFrequency = (int) getDSFrequency();// in minutes 596 // Calendar nominalInstanceCal = 597 // getCurrentInstance(getActionCreationtime()); 598 Calendar nominalInstanceCal = getEffectiveNominalTime(); 599 if (nominalInstanceCal == null) { 600 return -1; 601 } 602 nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n); 603 /* 604 * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) 605 * { return -1; } 606 */ 607 nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ 608 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 609 return nominalInstanceCal.getActualMaximum(Calendar.DAY_OF_MONTH); 610 } 611 612 public static int ph3_coord_daysInMonth(int n) throws Exception { 613 return ph2_coord_daysInMonth(n); 614 } 615 616 /** 617 * Determine the date-time in Oozie processing timezone of n-th latest available dataset instance. <p/> It depends 618 * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST 619 * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of 620 * dataset's directory 621 * 622 * @param n :instance count <p/> domain: n <= 0, n is integer 623 * @return date-time in Oozie processing timezone of the n-th instance <p/> returns 'null' means n-th instance is 624 * earlier than Initial-Instance of DS 625 * @throws Exception 626 */ 627 public static String ph3_coord_latest(int n) throws Exception { 628 ParamChecker.checkLEZero(n, "latest:n"); 629 if (isSyncDataSet()) {// For Sync Dataset 630 return coord_latest_sync(n); 631 } 632 else { 633 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 634 } 635 } 636 637 /** 638 * Determine the date-time in Oozie processing timezone of latest available dataset instances 639 * from start to end offsets from the nominal time. <p/> It depends 640 * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST 641 * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of 642 * dataset's directory 643 * 644 * @param start :start instance offset <p/> domain: start <= 0, start is integer 645 * @param end :end instance offset <p/> domain: end <= 0, end is integer 646 * @return date-time in Oozie processing timezone of the instances from start to end offsets 647 * delimited by comma. <p/> returns 'null' means start offset instance is 648 * earlier than Initial-Instance of DS 649 * @throws Exception 650 */ 651 public static String ph3_coord_latestRange(int start, int end) throws Exception { 652 ParamChecker.checkLEZero(start, "latest:n"); 653 ParamChecker.checkLEZero(end, "latest:n"); 654 if (isSyncDataSet()) {// For Sync Dataset 655 return coord_latestRange_sync(start, end); 656 } 657 else { 658 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 659 } 660 } 661 662 /** 663 * Configure an evaluator with data set and application specific information. <p/> Helper method of associating 664 * dataset and application object 665 * 666 * @param evaluator : to set variables 667 * @param ds : Data Set object 668 * @param coordAction : Application instance 669 */ 670 public static void configureEvaluator(ELEvaluator evaluator, SyncCoordDataset ds, SyncCoordAction coordAction) { 671 evaluator.setVariable(COORD_ACTION, coordAction); 672 evaluator.setVariable(DATASET, ds); 673 } 674 675 /** 676 * Helper method to wrap around with "${..}". <p/> 677 * 678 * 679 * @param eval :EL evaluator 680 * @param expr : expression to evaluate 681 * @return Resolved expression or echo back the same expression 682 * @throws Exception 683 */ 684 public static String evalAndWrap(ELEvaluator eval, String expr) throws Exception { 685 try { 686 eval.setVariable(".wrap", null); 687 String result = eval.evaluate(expr, String.class); 688 if (eval.getVariable(".wrap") != null) { 689 return "${" + result + "}"; 690 } 691 else { 692 return result; 693 } 694 } 695 catch (Exception e) { 696 throw new Exception("Unable to evaluate :" + expr + ":\n", e); 697 } 698 } 699 700 // Set of echo functions 701 702 public static String ph1_coord_current_echo(String n) { 703 return echoUnResolved("current", n); 704 } 705 706 public static String ph1_coord_currentRange_echo(String start, String end) { 707 return echoUnResolved("currentRange", start + ", " + end); 708 } 709 710 public static String ph1_coord_offset_echo(String n, String timeUnit) { 711 return echoUnResolved("offset", n + " , " + timeUnit); 712 } 713 714 public static String ph2_coord_current_echo(String n) { 715 return echoUnResolved("current", n); 716 } 717 718 public static String ph2_coord_currentRange_echo(String start, String end) { 719 return echoUnResolved("currentRange", start + ", " + end); 720 } 721 722 public static String ph2_coord_offset_echo(String n, String timeUnit) { 723 return echoUnResolved("offset", n + " , " + timeUnit); 724 } 725 726 public static String ph1_coord_dateOffset_echo(String n, String offset, String unit) { 727 return echoUnResolved("dateOffset", n + " , " + offset + " , " + unit); 728 } 729 730 public static String ph1_coord_formatTime_echo(String dateTime, String format) { 731 // Quote the dateTime value since it would contain a ':'. 732 return echoUnResolved("formatTime", "'"+dateTime+"'" + " , " + format); 733 } 734 735 public static String ph1_coord_latest_echo(String n) { 736 return echoUnResolved("latest", n); 737 } 738 739 public static String ph2_coord_latest_echo(String n) { 740 return ph1_coord_latest_echo(n); 741 } 742 743 public static String ph1_coord_future_echo(String n, String instance) { 744 return echoUnResolved("future", n + ", " + instance + ""); 745 } 746 747 public static String ph2_coord_future_echo(String n, String instance) { 748 return ph1_coord_future_echo(n, instance); 749 } 750 751 public static String ph1_coord_latestRange_echo(String start, String end) { 752 return echoUnResolved("latestRange", start + ", " + end); 753 } 754 755 public static String ph2_coord_latestRange_echo(String start, String end) { 756 return ph1_coord_latestRange_echo(start, end); 757 } 758 759 public static String ph1_coord_futureRange_echo(String start, String end, String instance) { 760 return echoUnResolved("futureRange", start + ", " + end + ", " + instance); 761 } 762 763 public static String ph2_coord_futureRange_echo(String start, String end, String instance) { 764 return ph1_coord_futureRange_echo(start, end, instance); 765 } 766 767 public static String ph1_coord_dataIn_echo(String n) { 768 ELEvaluator eval = ELEvaluator.getCurrent(); 769 String val = (String) eval.getVariable("oozie.dataname." + n); 770 if (val == null || val.equals("data-in") == false) { 771 XLog.getLog(CoordELFunctions.class).error("data_in_name " + n + " is not valid"); 772 throw new RuntimeException("data_in_name " + n + " is not valid"); 773 } 774 return echoUnResolved("dataIn", "'" + n + "'"); 775 } 776 777 public static String ph1_coord_dataOut_echo(String n) { 778 ELEvaluator eval = ELEvaluator.getCurrent(); 779 String val = (String) eval.getVariable("oozie.dataname." + n); 780 if (val == null || val.equals("data-out") == false) { 781 XLog.getLog(CoordELFunctions.class).error("data_out_name " + n + " is not valid"); 782 throw new RuntimeException("data_out_name " + n + " is not valid"); 783 } 784 return echoUnResolved("dataOut", "'" + n + "'"); 785 } 786 787 public static String ph1_coord_nominalTime_echo() { 788 return echoUnResolved("nominalTime", ""); 789 } 790 791 public static String ph1_coord_nominalTime_echo_wrap() { 792 // return "${coord:nominalTime()}"; // no resolution 793 return echoUnResolved("nominalTime", ""); 794 } 795 796 public static String ph1_coord_nominalTime_echo_fixed() { 797 return "2009-03-06T010:00"; // Dummy resolution 798 } 799 800 public static String ph1_coord_actualTime_echo_wrap() { 801 // return "${coord:actualTime()}"; // no resolution 802 return echoUnResolved("actualTime", ""); 803 } 804 805 public static String ph1_coord_actionId_echo() { 806 return echoUnResolved("actionId", ""); 807 } 808 809 public static String ph1_coord_name_echo() { 810 return echoUnResolved("name", ""); 811 } 812 813 // The following echo functions are not used in any phases yet 814 // They are here for future purpose. 815 public static String coord_minutes_echo(String n) { 816 return echoUnResolved("minutes", n); 817 } 818 819 public static String coord_hours_echo(String n) { 820 return echoUnResolved("hours", n); 821 } 822 823 public static String coord_days_echo(String n) { 824 return echoUnResolved("days", n); 825 } 826 827 public static String coord_endOfDay_echo(String n) { 828 return echoUnResolved("endOfDay", n); 829 } 830 831 public static String coord_months_echo(String n) { 832 return echoUnResolved("months", n); 833 } 834 835 public static String coord_endOfMonth_echo(String n) { 836 return echoUnResolved("endOfMonth", n); 837 } 838 839 public static String coord_actualTime_echo() { 840 return echoUnResolved("actualTime", ""); 841 } 842 843 // This echo function will always return "24" for validation only. 844 // This evaluation ****should not**** replace the original XML 845 // Create a temporary string and validate the function 846 // This is **required** for evaluating an expression like 847 // coord:HoursInDay(0) + 3 848 // actual evaluation will happen in phase 2 or phase 3. 849 public static String ph1_coord_hoursInDay_echo(String n) { 850 return "24"; 851 // return echoUnResolved("hoursInDay", n); 852 } 853 854 // This echo function will always return "30" for validation only. 855 // This evaluation ****should not**** replace the original XML 856 // Create a temporary string and validate the function 857 // This is **required** for evaluating an expression like 858 // coord:daysInMonth(0) + 3 859 // actual evaluation will happen in phase 2 or phase 3. 860 public static String ph1_coord_daysInMonth_echo(String n) { 861 // return echoUnResolved("daysInMonth", n); 862 return "30"; 863 } 864 865 // This echo function will always return "3" for validation only. 866 // This evaluation ****should not**** replace the original XML 867 // Create a temporary string and validate the function 868 // This is **required** for evaluating an expression like coord:tzOffset + 2 869 // actual evaluation will happen in phase 2 or phase 3. 870 public static String ph1_coord_tzOffset_echo() { 871 // return echoUnResolved("tzOffset", ""); 872 return "3"; 873 } 874 875 // Local methods 876 /** 877 * @param n 878 * @return n-th instance Date-Time from current instance for data-set <p/> return empty string ("") if the 879 * Action_Creation_time or the n-th instance <p/> is earlier than the Initial_Instance of dataset. 880 * @throws Exception 881 */ 882 private static String coord_current_sync(int n) throws Exception { 883 return coord_currentRange_sync(n, n); 884 } 885 886 private static String coord_currentRange_sync(int start, int end) throws Exception { 887 final XLog LOG = XLog.getLog(CoordELFunctions.class); 888 int datasetFrequency = getDSFrequency();// in minutes 889 TimeUnit dsTimeUnit = getDSTimeUnit(); 890 int[] instCount = new int[1];// used as pass by ref 891 Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount); 892 StringBuilder instanceList = new StringBuilder(); 893 if (nominalInstanceCal == null) { 894 LOG.warn("If the initial instance of the dataset is later than the nominal time, an empty string is" 895 + " returned. This means that no data is available at the current-instance specified by the user" 896 + " and the user could try modifying his initial-instance to an earlier time."); 897 return ""; 898 } else { 899 Calendar initInstance = getInitialInstanceCal(); 900 instCount[0] = instCount[0] + end; 901 // Add in the reverse order - newest instance first. 902 for (int i = end; i >= start; i--) { 903 // Tried to avoid the clone. But subtracting datasetFrequency gives different results than multiplying 904 // and Spring DST transition test in TestCoordELfunctions.testCurrent() fails 905 //nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), -datasetFrequency); 906 nominalInstanceCal = (Calendar) initInstance.clone(); 907 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 908 instCount[0]--; 909 if (nominalInstanceCal.compareTo(initInstance) < 0) { 910 LOG.warn("If the initial instance of the dataset is later than the current-instance specified," 911 + " such as coord:current({0}) in this case, an empty string is returned. This means that" 912 + " no data is available at the current-instance specified by the user and the user could" 913 + " try modifying his initial-instance to an earlier time.", start); 914 break; 915 } 916 else { 917 instanceList.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)); 918 instanceList.append(CoordELFunctions.INSTANCE_SEPARATOR); 919 } 920 } 921 } 922 923 if (instanceList.length() > 0) { 924 instanceList.setLength(instanceList.length() - CoordELFunctions.INSTANCE_SEPARATOR.length()); 925 } 926 return instanceList.toString(); 927 } 928 929 /** 930 * 931 * @param n offset amount (integer) 932 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR") 933 * @return the offset time from the effective nominal time <p/> return empty string ("") if the Action_Creation_time or the 934 * offset instance <p/> is earlier than the Initial_Instance of dataset. 935 * @throws Exception 936 */ 937 private static String coord_offset_sync(int n, String timeUnit) throws Exception { 938 Calendar rawCal = resolveOffsetRawTime(n, TimeUnit.valueOf(timeUnit), null); 939 if (rawCal == null) { 940 // warning already logged by resolveOffsetRawTime() 941 return ""; 942 } 943 944 int freq = getDSFrequency(); 945 TimeUnit freqUnit = getDSTimeUnit(); 946 int freqCount = 0; 947 // We're going to manually turn back/forward cal by decrements/increments of freq and then check that it gives the same 948 // time as rawCal; this is to check that the offset time resolves to a frequency offset of the effective nominal time 949 // In other words, that there exists an integer x, such that coord:offset(n, timeUnit) == coord:current(x) is true 950 // If not, then we'll "rewind" rawCal to the latest instance earlier than rawCal and use that. 951 Calendar cal = getInitialInstanceCal(); 952 if (rawCal.before(cal)) { 953 while (cal.after(rawCal)) { 954 cal.add(freqUnit.getCalendarUnit(), -freq); 955 freqCount--; 956 } 957 } 958 else if (rawCal.after(cal)) { 959 while (cal.before(rawCal)) { 960 cal.add(freqUnit.getCalendarUnit(), freq); 961 freqCount++; 962 } 963 } 964 if (cal.before(rawCal)) { 965 rawCal = cal; 966 } 967 else if (cal.after(rawCal)) { 968 cal.add(freqUnit.getCalendarUnit(), -freq); 969 rawCal = cal; 970 freqCount--; 971 } 972 String rawCalStr = DateUtils.formatDateOozieTZ(rawCal); 973 974 Calendar nominalInstanceCal = getInitialInstanceCal(); 975 nominalInstanceCal.add(freqUnit.getCalendarUnit(), freq * freqCount); 976 if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) { 977 XLog.getLog(CoordELFunctions.class).warn("If the initial instance of the dataset is later than the offset instance" 978 + " specified, such as coord:offset({0}, {1}) in this case, an empty string is returned. This means that no" 979 + " data is available at the offset instance specified by the user and the user could try modifying his" 980 + " initial-instance to an earlier time.", n, timeUnit); 981 return ""; 982 } 983 String nominalCalStr = DateUtils.formatDateOozieTZ(nominalInstanceCal); 984 985 if (!rawCalStr.equals(nominalCalStr)) { 986 throw new RuntimeException("Shouldn't happen"); 987 } 988 return rawCalStr; 989 } 990 991 /** 992 * @param offset 993 * @return n-th available latest instance Date-Time for SYNC data-set 994 * @throws Exception 995 */ 996 private static String coord_latest_sync(int offset) throws Exception { 997 return coord_latestRange_sync(offset, offset); 998 } 999 1000 private static String coord_latestRange_sync(int startOffset, int endOffset) throws Exception { 1001 final XLog LOG = XLog.getLog(CoordELFunctions.class); 1002 final Thread currentThread = Thread.currentThread(); 1003 ELEvaluator eval = ELEvaluator.getCurrent(); 1004 String retVal = ""; 1005 int datasetFrequency = (int) getDSFrequency();// in minutes 1006 TimeUnit dsTimeUnit = getDSTimeUnit(); 1007 int[] instCount = new int[1]; 1008 boolean useCurrentTime = Services.get().getConf().getBoolean(LATEST_EL_USE_CURRENT_TIME, false); 1009 Calendar nominalInstanceCal; 1010 if (useCurrentTime) { 1011 nominalInstanceCal = getCurrentInstance(new Date(), instCount); 1012 } 1013 else { 1014 nominalInstanceCal = getCurrentInstance(getActualTime(), instCount); 1015 } 1016 StringBuilder resolvedInstances = new StringBuilder(); 1017 StringBuilder resolvedURIPaths = new StringBuilder(); 1018 if (nominalInstanceCal != null) { 1019 Calendar initInstance = getInitialInstanceCal(); 1020 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1021 if (ds == null) { 1022 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1023 } 1024 String uriTemplate = ds.getUriTemplate(); 1025 Configuration conf = (Configuration) eval.getVariable(CONFIGURATION); 1026 if (conf == null) { 1027 throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION); 1028 } 1029 int available = 0; 1030 boolean resolved = false; 1031 String user = ParamChecker 1032 .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME); 1033 String doneFlag = ds.getDoneFlag(); 1034 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 1035 URIHandler uriHandler = null; 1036 Context uriContext = null; 1037 try { 1038 while (nominalInstanceCal.compareTo(initInstance) >= 0 && !currentThread.isInterrupted()) { 1039 ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal); 1040 String uriPath = uriEval.evaluate(uriTemplate, String.class); 1041 if (uriHandler == null) { 1042 URI uri = new URI(uriPath); 1043 uriHandler = uriService.getURIHandler(uri); 1044 uriContext = uriHandler.getContext(uri, conf, user); 1045 } 1046 String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag); 1047 if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) { 1048 XLog.getLog(CoordELFunctions.class) 1049 .debug("Found latest(" + available + "): " + uriWithDoneFlag); 1050 if (available == startOffset) { 1051 LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag); 1052 resolved = true; 1053 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)); 1054 resolvedURIPaths.append(uriPath); 1055 retVal = resolvedInstances.toString(); 1056 eval.setVariable("resolved_path", resolvedURIPaths.toString()); 1057 break; 1058 } 1059 else if (available <= endOffset) { 1060 LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag); 1061 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append( 1062 INSTANCE_SEPARATOR); 1063 resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR); 1064 } 1065 1066 available--; 1067 } 1068 // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), -datasetFrequency); 1069 nominalInstanceCal = (Calendar) initInstance.clone(); 1070 instCount[0]--; 1071 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 1072 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 1073 } 1074 } 1075 finally { 1076 if (uriContext != null) { 1077 uriContext.destroy(); 1078 } 1079 } 1080 if (!resolved) { 1081 // return unchanged latest function with variable 'is_resolved' 1082 // to 'false' 1083 eval.setVariable("is_resolved", Boolean.FALSE); 1084 if (startOffset == endOffset) { 1085 retVal = "${coord:latest(" + startOffset + ")}"; 1086 } 1087 else { 1088 retVal = "${coord:latestRange(" + startOffset + "," + endOffset + ")}"; 1089 } 1090 } 1091 else { 1092 eval.setVariable("is_resolved", Boolean.TRUE); 1093 } 1094 } 1095 else {// No feasible nominal time 1096 eval.setVariable("is_resolved", Boolean.FALSE); 1097 } 1098 return retVal; 1099 } 1100 1101 /** 1102 * @param tm 1103 * @return a new Evaluator to be used for URI-template evaluation 1104 */ 1105 private static ELEvaluator getUriEvaluator(Calendar tm) { 1106 tm.setTimeZone(DateUtils.getOozieProcessingTimeZone()); 1107 ELEvaluator retEval = new ELEvaluator(); 1108 retEval.setVariable("YEAR", tm.get(Calendar.YEAR)); 1109 retEval.setVariable("MONTH", (tm.get(Calendar.MONTH) + 1) < 10 ? "0" + (tm.get(Calendar.MONTH) + 1) : (tm 1110 .get(Calendar.MONTH) + 1)); 1111 retEval.setVariable("DAY", tm.get(Calendar.DAY_OF_MONTH) < 10 ? "0" + tm.get(Calendar.DAY_OF_MONTH) : tm 1112 .get(Calendar.DAY_OF_MONTH)); 1113 retEval.setVariable("HOUR", tm.get(Calendar.HOUR_OF_DAY) < 10 ? "0" + tm.get(Calendar.HOUR_OF_DAY) : tm 1114 .get(Calendar.HOUR_OF_DAY)); 1115 retEval.setVariable("MINUTE", tm.get(Calendar.MINUTE) < 10 ? "0" + tm.get(Calendar.MINUTE) : tm 1116 .get(Calendar.MINUTE)); 1117 return retEval; 1118 } 1119 1120 /** 1121 * @return whether a data set is SYNCH or ASYNC 1122 */ 1123 private static boolean isSyncDataSet() { 1124 ELEvaluator eval = ELEvaluator.getCurrent(); 1125 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1126 if (ds == null) { 1127 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1128 } 1129 return ds.getType().equalsIgnoreCase("SYNC"); 1130 } 1131 1132 /** 1133 * Check whether a function should be resolved. 1134 * 1135 * @param functionName 1136 * @param n 1137 * @return null if the functionName needs to be resolved otherwise return the calling function unresolved. 1138 */ 1139 private static String checkIfResolved(String functionName, String n) { 1140 ELEvaluator eval = ELEvaluator.getCurrent(); 1141 String replace = (String) eval.getVariable("resolve_" + functionName); 1142 if (replace == null || (replace != null && replace.equalsIgnoreCase("false"))) { // Don't 1143 // resolve 1144 // return "${coord:" + functionName + "(" + n +")}"; //Unresolved 1145 eval.setVariable(".wrap", "true"); 1146 return "coord:" + functionName + "(" + n + ")"; // Unresolved 1147 } 1148 return null; // Resolved it 1149 } 1150 1151 private static String echoUnResolved(String functionName, String n) { 1152 return echoUnResolvedPre(functionName, n, "coord:"); 1153 } 1154 1155 private static String echoUnResolvedPre(String functionName, String n, String prefix) { 1156 ELEvaluator eval = ELEvaluator.getCurrent(); 1157 eval.setVariable(".wrap", "true"); 1158 return prefix + functionName + "(" + n + ")"; // Unresolved 1159 } 1160 1161 /** 1162 * @return the initial instance of a DataSet in DATE 1163 */ 1164 private static Date getInitialInstance() { 1165 ELEvaluator eval = ELEvaluator.getCurrent(); 1166 return getInitialInstance(eval); 1167 } 1168 1169 /** 1170 * @return the initial instance of a DataSet in DATE 1171 */ 1172 private static Date getInitialInstance(ELEvaluator eval) { 1173 return getInitialInstanceCal(eval).getTime(); 1174 // return ds.getInitInstance(); 1175 } 1176 1177 /** 1178 * @return the initial instance of a DataSet in Calendar 1179 */ 1180 private static Calendar getInitialInstanceCal() { 1181 ELEvaluator eval = ELEvaluator.getCurrent(); 1182 return getInitialInstanceCal(eval); 1183 } 1184 1185 /** 1186 * @return the initial instance of a DataSet in Calendar 1187 */ 1188 private static Calendar getInitialInstanceCal(ELEvaluator eval) { 1189 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1190 if (ds == null) { 1191 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1192 } 1193 Calendar effInitTS = Calendar.getInstance(); 1194 effInitTS.setTime(ds.getInitInstance()); 1195 effInitTS.setTimeZone(ds.getTimeZone()); 1196 // To adjust EOD/EOM 1197 DateUtils.moveToEnd(effInitTS, getDSEndOfFlag(eval)); 1198 return effInitTS; 1199 // return ds.getInitInstance(); 1200 } 1201 1202 /** 1203 * @return Nominal or action creation Time when all the dependencies of an application instance are met. 1204 */ 1205 private static Date getActionCreationtime() { 1206 ELEvaluator eval = ELEvaluator.getCurrent(); 1207 return getActionCreationtime(eval); 1208 } 1209 1210 /** 1211 * @return Nominal or action creation Time when all the dependencies of an application instance are met. 1212 */ 1213 private static Date getActionCreationtime(ELEvaluator eval) { 1214 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 1215 if (coordAction == null) { 1216 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 1217 } 1218 return coordAction.getNominalTime(); 1219 } 1220 1221 /** 1222 * @return Actual Time when all the dependencies of an application instance are met. 1223 */ 1224 private static Date getActualTime() { 1225 ELEvaluator eval = ELEvaluator.getCurrent(); 1226 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 1227 if (coordAction == null) { 1228 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 1229 } 1230 return coordAction.getActualTime(); 1231 } 1232 1233 /** 1234 * @return TimeZone for the application or job. 1235 */ 1236 private static TimeZone getJobTZ() { 1237 ELEvaluator eval = ELEvaluator.getCurrent(); 1238 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 1239 if (coordAction == null) { 1240 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 1241 } 1242 return coordAction.getTimeZone(); 1243 } 1244 1245 /** 1246 * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time) 1247 * 1248 * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of 1249 * the dataset. 1250 */ 1251 public static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[]) { 1252 ELEvaluator eval = ELEvaluator.getCurrent(); 1253 return getCurrentInstance(effectiveTime, instanceCount, eval); 1254 } 1255 1256 /** 1257 * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time) 1258 * 1259 * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of 1260 * the dataset. 1261 */ 1262 private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[], ELEvaluator eval) { 1263 Date datasetInitialInstance = getInitialInstance(eval); 1264 TimeUnit dsTimeUnit = getDSTimeUnit(eval); 1265 TimeZone dsTZ = getDatasetTZ(eval); 1266 int dsFreq = getDSFrequency(eval); 1267 // Convert Date to Calendar for corresponding TZ 1268 Calendar current = Calendar.getInstance(); 1269 current.setTime(datasetInitialInstance); 1270 current.setTimeZone(dsTZ); 1271 1272 Calendar calEffectiveTime = Calendar.getInstance(); 1273 calEffectiveTime.setTime(effectiveTime); 1274 calEffectiveTime.setTimeZone(dsTZ); 1275 if (instanceCount == null) { // caller doesn't care about this value 1276 instanceCount = new int[1]; 1277 } 1278 instanceCount[0] = 0; 1279 if (current.compareTo(calEffectiveTime) > 0) { 1280 return null; 1281 } 1282 Calendar origCurrent = (Calendar) current.clone(); 1283 while (current.compareTo(calEffectiveTime) <= 0) { 1284 current = (Calendar) origCurrent.clone(); 1285 instanceCount[0]++; 1286 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq); 1287 } 1288 instanceCount[0]--; 1289 1290 current = (Calendar) origCurrent.clone(); 1291 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq); 1292 return current; 1293 } 1294 1295 public static Calendar getEffectiveNominalTime() { 1296 Date datasetInitialInstance = getInitialInstance(); 1297 TimeZone dsTZ = getDatasetTZ(); 1298 // Convert Date to Calendar for corresponding TZ 1299 Calendar current = Calendar.getInstance(); 1300 current.setTime(datasetInitialInstance); 1301 current.setTimeZone(dsTZ); 1302 1303 Calendar calEffectiveTime = Calendar.getInstance(); 1304 calEffectiveTime.setTime(getActionCreationtime()); 1305 calEffectiveTime.setTimeZone(dsTZ); 1306 if (current.compareTo(calEffectiveTime) > 0) { 1307 // Nominal Time < initial Instance 1308 // TODO: getClass() call doesn't work from static method. 1309 // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+ 1310 // current.getTime()); 1311 return null; 1312 } 1313 return calEffectiveTime; 1314 } 1315 1316 /** 1317 * @return dataset frequency in minutes 1318 */ 1319 private static int getDSFrequency() { 1320 ELEvaluator eval = ELEvaluator.getCurrent(); 1321 return getDSFrequency(eval); 1322 } 1323 1324 /** 1325 * @return dataset frequency in minutes 1326 */ 1327 private static int getDSFrequency(ELEvaluator eval) { 1328 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1329 if (ds == null) { 1330 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1331 } 1332 return ds.getFrequency(); 1333 } 1334 1335 /** 1336 * @return dataset TimeUnit 1337 */ 1338 private static TimeUnit getDSTimeUnit() { 1339 ELEvaluator eval = ELEvaluator.getCurrent(); 1340 return getDSTimeUnit(eval); 1341 } 1342 1343 /** 1344 * @return dataset TimeUnit 1345 */ 1346 private static TimeUnit getDSTimeUnit(ELEvaluator eval) { 1347 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1348 if (ds == null) { 1349 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1350 } 1351 return ds.getTimeUnit(); 1352 } 1353 1354 /** 1355 * @return dataset TimeZone 1356 */ 1357 public static TimeZone getDatasetTZ() { 1358 ELEvaluator eval = ELEvaluator.getCurrent(); 1359 return getDatasetTZ(eval); 1360 } 1361 1362 /** 1363 * @return dataset TimeZone 1364 */ 1365 private static TimeZone getDatasetTZ(ELEvaluator eval) { 1366 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1367 if (ds == null) { 1368 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1369 } 1370 return ds.getTimeZone(); 1371 } 1372 1373 /** 1374 * @return dataset TimeUnit 1375 */ 1376 private static TimeUnit getDSEndOfFlag() { 1377 ELEvaluator eval = ELEvaluator.getCurrent(); 1378 return getDSEndOfFlag(eval); 1379 } 1380 1381 /** 1382 * @return dataset TimeUnit 1383 */ 1384 private static TimeUnit getDSEndOfFlag(ELEvaluator eval) { 1385 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1386 if (ds == null) { 1387 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1388 } 1389 return ds.getEndOfDuration();// == null ? "": ds.getEndOfDuration(); 1390 } 1391 1392 /** 1393 * Return a job configuration property for the coordinator. 1394 * 1395 * @param property property name. 1396 * @return the value of the property, <code>null</code> if the property is undefined. 1397 */ 1398 public static String coord_conf(String property) { 1399 ELEvaluator eval = ELEvaluator.getCurrent(); 1400 return (String) eval.getVariable(property); 1401 } 1402 1403 /** 1404 * Return the user that submitted the coordinator job. 1405 * 1406 * @return the user that submitted the coordinator job. 1407 */ 1408 public static String coord_user() { 1409 ELEvaluator eval = ELEvaluator.getCurrent(); 1410 return (String) eval.getVariable(OozieClient.USER_NAME); 1411 } 1412 1413 /** 1414 * Takes two offset times and returns a list of multiples of the frequency offset from the effective nominal time that occur 1415 * between them. The caller should make sure that startCal is earlier than endCal. 1416 * <p> 1417 * As a simple example, assume its the same day: startCal is 1:00, endCal is 2:00, frequency is 20min, and effective nominal 1418 * time is 1:20 -- then this method would return a list containing: -20, 0, 20, 40, 60 1419 * 1420 * @param startCal The earlier offset time 1421 * @param endCal The later offset time 1422 * @param eval The ELEvaluator to use; cannot be null 1423 * @return A list of multiple of the frequency offset from the effective nominal time that occur between the startCal and endCal 1424 */ 1425 public static List<Integer> expandOffsetTimes(Calendar startCal, Calendar endCal, ELEvaluator eval) { 1426 List<Integer> expandedFreqs = new ArrayList<Integer>(); 1427 // Use eval because the "current" eval isn't set 1428 int freq = getDSFrequency(eval); 1429 TimeUnit freqUnit = getDSTimeUnit(eval); 1430 Calendar cal = getCurrentInstance(getActionCreationtime(eval), null, eval); 1431 int totalFreq = 0; 1432 if (startCal.before(cal)) { 1433 while (cal.after(startCal)) { 1434 cal.add(freqUnit.getCalendarUnit(), -freq); 1435 totalFreq += -freq; 1436 } 1437 if (cal.before(startCal)) { 1438 cal.add(freqUnit.getCalendarUnit(), freq); 1439 totalFreq += freq; 1440 } 1441 } 1442 else if (startCal.after(cal)) { 1443 while (cal.before(startCal)) { 1444 cal.add(freqUnit.getCalendarUnit(), freq); 1445 totalFreq += freq; 1446 } 1447 } 1448 // At this point, cal is the smallest multiple of the dataset frequency that is >= to the startCal and offset from the 1449 // effective nominal time. Now we can find all of the instances that occur between startCal and endCal, inclusive. 1450 while (cal.before(endCal) || cal.equals(endCal)) { 1451 expandedFreqs.add(totalFreq); 1452 cal.add(freqUnit.getCalendarUnit(), freq); 1453 totalFreq += freq; 1454 } 1455 return expandedFreqs; 1456 } 1457 1458 /** 1459 * Resolve the offset time from the effective nominal time 1460 * 1461 * @param n offset amount (integer) 1462 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR") 1463 * @param eval The ELEvaluator to use; or null to use the "current" eval 1464 * @return A Calendar of the offset time 1465 */ 1466 public static Calendar resolveOffsetRawTime(int n, TimeUnit timeUnit, ELEvaluator eval) { 1467 // Use eval if given (for when the "current" eval isn't set) 1468 Calendar cal; 1469 if (eval == null) { 1470 cal = getCurrentInstance(getActionCreationtime(), null); 1471 } 1472 else { 1473 cal = getCurrentInstance(getActionCreationtime(eval), null, eval); 1474 } 1475 if (cal == null) { 1476 XLog.getLog(CoordELFunctions.class).warn("If the initial instance of the dataset is later than the nominal time, an" 1477 + " empty string is returned. This means that no data is available at the offset instance specified by the user" 1478 + " and the user could try modifying his or her initial-instance to an earlier time."); 1479 return null; 1480 } 1481 cal.add(timeUnit.getCalendarUnit(), n); 1482 return cal; 1483 } 1484 }